From 0fd4413ca4b427238ceeba967dbd4931a9b2fab1 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Tue, 2 Jul 2024 13:14:05 -0700 Subject: [PATCH 01/15] feedback --- .../spark/sql/execution/streaming/ColumnFamilySchemaFactory.scala | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ColumnFamilySchemaFactory.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ColumnFamilySchemaFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ColumnFamilySchemaFactory.scala new file mode 100644 index 0000000000000..e69de29bb2d1d From bbb7091778ed2cb813a5963015d50f16d1af6e94 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Tue, 9 Jul 2024 10:43:50 -0700 Subject: [PATCH 02/15] removing println, test passes --- .../sql/execution/streaming/IncrementalExecution.scala | 6 ++++++ .../spark/sql/streaming/TransformWithStateSuite.scala | 1 - 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index 4ce762714e864..337baa37e946d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -117,6 +117,12 @@ class IncrementalExecution( */ private val STATE_SCHEMA_DEFAULT_VERSION: Int = 2 + /** + * This value dictates which schema format version the state schema should be written in + * for all operators other than TransformWithState. + */ + private val STATE_SCHEMA_DEFAULT_VERSION: Int = 2 + /** * See [SPARK-18339] * Walk the optimized logical plan and replace CurrentBatchTimestamp diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala index fd621fc1161f1..b0c913549506b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala @@ -884,7 +884,6 @@ class TransformWithStateSuite extends StateStoreMetricsTest val columnFamilySchemas = fetchColumnFamilySchemas(chkptDir.getCanonicalPath, 0) assert(columnFamilySchemas.length == 1) - val expected = ColumnFamilySchemaV1( "countState", new StructType().add("key", From efd1a099f02d911bb7317b03a5f2e63bac8331ef Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Wed, 10 Jul 2024 10:23:24 -0700 Subject: [PATCH 03/15] keySchema --- .../apache/spark/sql/execution/streaming/state/StateStore.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 053112ebaa9ec..818488bc2266b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -293,7 +293,7 @@ object KeyStateEncoderSpec { // match on type m("keyStateEncoderType").asInstanceOf[String] match { case "NoPrefixKeyStateEncoderSpec" => - NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA) + NoPrefixKeyStateEncoderSpec(keySchema) case "RangeKeyScanStateEncoderSpec" => val orderingOrdinals = m("orderingOrdinals"). asInstanceOf[List[_]].map(_.asInstanceOf[BigInt].toInt) From 47ea28b284b51f862bee803a0b7310bdd37e7ebd Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Wed, 10 Jul 2024 15:52:15 -0700 Subject: [PATCH 04/15] rebase --- .../spark/sql/execution/streaming/state/StateStore.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 818488bc2266b..39937b6cdd6e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -293,14 +293,14 @@ object KeyStateEncoderSpec { // match on type m("keyStateEncoderType").asInstanceOf[String] match { case "NoPrefixKeyStateEncoderSpec" => - NoPrefixKeyStateEncoderSpec(keySchema) + NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA) case "RangeKeyScanStateEncoderSpec" => val orderingOrdinals = m("orderingOrdinals"). asInstanceOf[List[_]].map(_.asInstanceOf[BigInt].toInt) RangeKeyScanStateEncoderSpec(keySchema, orderingOrdinals) case "PrefixKeyScanStateEncoderSpec" => - val numColsPrefixKey = m("numColsPrefixKey").asInstanceOf[BigInt].toInt - PrefixKeyScanStateEncoderSpec(COMPOSITE_KEY_ROW_SCHEMA, numColsPrefixKey) + val numColsPrefixKey = m("numColsPrefixKey").asInstanceOf[BigInt] + PrefixKeyScanStateEncoderSpec(COMPOSITE_KEY_ROW_SCHEMA, numColsPrefixKey.toInt) } } } From fbe62d757fc6de6f17cf45d9b423599fd71694c1 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Wed, 10 Jul 2024 10:23:24 -0700 Subject: [PATCH 05/15] keySchema --- .../spark/sql/execution/streaming/state/StateStore.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 39937b6cdd6e5..d35f3c70e75ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -39,7 +39,6 @@ import org.apache.spark.sql.catalyst.util.UnsafeRowUtils import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo -import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{COMPOSITE_KEY_ROW_SCHEMA, KEY_ROW_SCHEMA} import org.apache.spark.sql.types.StructType import org.apache.spark.util.{NextIterator, ThreadUtils, Utils} @@ -293,14 +292,14 @@ object KeyStateEncoderSpec { // match on type m("keyStateEncoderType").asInstanceOf[String] match { case "NoPrefixKeyStateEncoderSpec" => - NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA) + NoPrefixKeyStateEncoderSpec(keySchema) case "RangeKeyScanStateEncoderSpec" => val orderingOrdinals = m("orderingOrdinals"). asInstanceOf[List[_]].map(_.asInstanceOf[BigInt].toInt) RangeKeyScanStateEncoderSpec(keySchema, orderingOrdinals) case "PrefixKeyScanStateEncoderSpec" => val numColsPrefixKey = m("numColsPrefixKey").asInstanceOf[BigInt] - PrefixKeyScanStateEncoderSpec(COMPOSITE_KEY_ROW_SCHEMA, numColsPrefixKey.toInt) + PrefixKeyScanStateEncoderSpec(keySchema, numColsPrefixKey.toInt) } } } From 43bf78fd82db85867a5213981e82cb2f33d9dcef Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Wed, 10 Jul 2024 15:52:15 -0700 Subject: [PATCH 06/15] rebase --- .../sql/execution/streaming/state/StateSchemaV3File.scala | 3 +-- .../spark/sql/execution/streaming/state/StateStore.scala | 5 +++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaV3File.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaV3File.scala index e7e94ae193664..4bd49d41c14a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaV3File.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaV3File.scala @@ -26,7 +26,6 @@ import scala.io.{Source => IOSource} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.streaming.CheckpointFileManager import org.apache.spark.sql.execution.streaming.MetadataVersionUtil.validateVersion @@ -40,7 +39,7 @@ import org.apache.spark.sql.execution.streaming.MetadataVersionUtil.validateVers */ class StateSchemaV3File( hadoopConf: Configuration, - path: String) extends Logging { + path: String) { val metadataPath = new Path(path) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index d35f3c70e75ff..39937b6cdd6e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.util.UnsafeRowUtils import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo +import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{COMPOSITE_KEY_ROW_SCHEMA, KEY_ROW_SCHEMA} import org.apache.spark.sql.types.StructType import org.apache.spark.util.{NextIterator, ThreadUtils, Utils} @@ -292,14 +293,14 @@ object KeyStateEncoderSpec { // match on type m("keyStateEncoderType").asInstanceOf[String] match { case "NoPrefixKeyStateEncoderSpec" => - NoPrefixKeyStateEncoderSpec(keySchema) + NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA) case "RangeKeyScanStateEncoderSpec" => val orderingOrdinals = m("orderingOrdinals"). asInstanceOf[List[_]].map(_.asInstanceOf[BigInt].toInt) RangeKeyScanStateEncoderSpec(keySchema, orderingOrdinals) case "PrefixKeyScanStateEncoderSpec" => val numColsPrefixKey = m("numColsPrefixKey").asInstanceOf[BigInt] - PrefixKeyScanStateEncoderSpec(keySchema, numColsPrefixKey.toInt) + PrefixKeyScanStateEncoderSpec(COMPOSITE_KEY_ROW_SCHEMA, numColsPrefixKey.toInt) } } } From c3fc3168bb4a330cf47cf44f984449f1223671da Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Tue, 25 Jun 2024 15:40:18 -0700 Subject: [PATCH 07/15] writing schema --- .../streaming/MapStateImplWithTTL.scala | 10 ++ .../streaming/StateSchemaV3File.scala | 100 ++++++++++++++++++ 2 files changed, 110 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImplWithTTL.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImplWithTTL.scala index 2ab06f36dd5f7..1d3c8e7a9747b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImplWithTTL.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImplWithTTL.scala @@ -24,6 +24,16 @@ import org.apache.spark.sql.execution.streaming.state.{PrefixKeyScanStateEncoder import org.apache.spark.sql.streaming.{MapState, TTLConfig} import org.apache.spark.util.NextIterator +object MapStateImplWithTTL { + def columnFamilySchema(stateName: String): ColumnFamilySchemaV1 = { + new ColumnFamilySchemaV1( + stateName, + COMPOSITE_KEY_ROW_SCHEMA, + VALUE_ROW_SCHEMA_WITH_TTL, + PrefixKeyScanStateEncoderSpec(COMPOSITE_KEY_ROW_SCHEMA, 1), false) + } +} + /** * Class that provides a concrete implementation for map state associated with state * variables (with ttl expiration support) used in the streaming transformWithState operator. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateSchemaV3File.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateSchemaV3File.scala index e69de29bb2d1d..82bab9a5301f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateSchemaV3File.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateSchemaV3File.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{InputStream, OutputStream, StringReader} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream} +import org.json4s.JValue +import org.json4s.jackson.JsonMethods +import org.json4s.jackson.JsonMethods.{compact, render} + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.SQLConf + +class StateSchemaV3File( + hadoopConf: Configuration, + path: String, + metadataCacheEnabled: Boolean = false) + extends HDFSMetadataLog[JValue](hadoopConf, path, metadataCacheEnabled) { + + final val MAX_UTF_CHUNK_SIZE = 65535 + def this(sparkSession: SparkSession, path: String) = { + this( + sparkSession.sessionState.newHadoopConf(), + path, + metadataCacheEnabled = sparkSession.sessionState.conf.getConf( + SQLConf.STREAMING_METADATA_CACHE_ENABLED) + ) + } + + override protected def serialize(schema: JValue, out: OutputStream): Unit = { + val json = compact(render(schema)) + val buf = new Array[Char](MAX_UTF_CHUNK_SIZE) + + val outputStream = out.asInstanceOf[FSDataOutputStream] + // DataOutputStream.writeUTF can't write a string at once + // if the size exceeds 65535 (2^16 - 1) bytes. + // Each metadata consists of multiple chunks in schema version 3. + try { + val numMetadataChunks = (json.length - 1) / MAX_UTF_CHUNK_SIZE + 1 + val metadataStringReader = new StringReader(json) + outputStream.writeInt(numMetadataChunks) + (0 until numMetadataChunks).foreach { _ => + val numRead = metadataStringReader.read(buf, 0, MAX_UTF_CHUNK_SIZE) + outputStream.writeUTF(new String(buf, 0, numRead)) + } + outputStream.close() + } catch { + case e: Throwable => + throw e + } + } + + override protected def deserialize(in: InputStream): JValue = { + val buf = new StringBuilder + val inputStream = in.asInstanceOf[FSDataInputStream] + val numKeyChunks = inputStream.readInt() + (0 until numKeyChunks).foreach(_ => buf.append(inputStream.readUTF())) + val json = buf.toString() + JsonMethods.parse(json) + } + + override def add(batchId: Long, metadata: JValue): Boolean = { + require(metadata != null, "'null' metadata cannot written to a metadata log") + val batchMetadataFile = batchIdToPath(batchId) + if (fileManager.exists(batchMetadataFile)) { + fileManager.delete(batchMetadataFile) + } + val res = addNewBatchByStream(batchId) { output => serialize(metadata, output) } + if (metadataCacheEnabled && res) batchCache.put(batchId, metadata) + res + } + + override def addNewBatchByStream(batchId: Long)(fn: OutputStream => Unit): Boolean = { + val batchMetadataFile = batchIdToPath(batchId) + + if (metadataCacheEnabled && batchCache.containsKey(batchId)) { + false + } else { + write(batchMetadataFile, fn) + true + } + } +} From 2dcd90e37a4f6770356d14890f14a2381b2aaeb2 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Thu, 11 Jul 2024 11:11:09 -0700 Subject: [PATCH 08/15] purging --- .../sql/execution/streaming/IncrementalExecution.scala | 6 ------ 1 file changed, 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index 337baa37e946d..4ce762714e864 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -117,12 +117,6 @@ class IncrementalExecution( */ private val STATE_SCHEMA_DEFAULT_VERSION: Int = 2 - /** - * This value dictates which schema format version the state schema should be written in - * for all operators other than TransformWithState. - */ - private val STATE_SCHEMA_DEFAULT_VERSION: Int = 2 - /** * See [SPARK-18339] * Walk the optimized logical plan and replace CurrentBatchTimestamp From 4ba3d582183179c11bbc6d63be151ff9fee041ca Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Mon, 15 Jul 2024 13:52:10 -0700 Subject: [PATCH 09/15] init --- .../sql/execution/streaming/TransformWithStateExec.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala index 0f43256e3b6c2..76cdbeedaafe7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala @@ -431,6 +431,12 @@ case class TransformWithStateExec( } } + private def validateMetadatas( + oldMetadata: OperatorStateMetadata, + newMetadata: OperatorStateMetadata): Unit = { + + } + /** Metadata of this stateful operator and its states stores. */ override def operatorStateMetadata( stateSchemaPaths: Array[String] = Array.empty): OperatorStateMetadata = { From bc24c1fa3c5a5bce8a198e2fe6e0808610e808eb Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Mon, 15 Jul 2024 14:53:55 -0700 Subject: [PATCH 10/15] added test case and error msg --- .../resources/error/error-conditions.json | 6 +++ .../streaming/IncrementalExecution.scala | 6 +++ .../streaming/TransformWithStateExec.scala | 36 ++++++++++++++++-- .../streaming/state/StateStoreErrors.scala | 15 ++++++++ .../streaming/TransformWithStateSuite.scala | 38 ++++++++++++++++++- 5 files changed, 97 insertions(+), 4 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 7f54a77c94a0f..dc20e411f4cfa 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -3804,6 +3804,12 @@ ], "sqlState" : "42802" }, + "STATE_STORE_INVALID_CONFIG_AFTER_RESTART" : { + "message" : [ + " is not equal to . Please set to , or restart with a new checkpoint directory." + ], + "sqlState" : "42K06" + }, "STATE_STORE_INVALID_PROVIDER" : { "message" : [ "The given State Store Provider does not extend org.apache.spark.sql.execution.streaming.state.StateStoreProvider." diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index 4ce762714e864..6762b1d120d4c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -243,6 +243,12 @@ class IncrementalExecution( checkpointLocation, tws.getStateInfo.operatorId.toString)) val operatorStateMetadataLog = new OperatorStateMetadataLog(sparkSession, metadataPath.toString) + // check if old metadata is present. if it is, validate with this metadata + operatorStateMetadataLog.getLatest() match { + case Some((_, oldMetadata)) => + tws.validateMetadatas(oldMetadata, metadata) + case None => + } operatorStateMetadataLog.add(currentBatchId, metadata) case _ => val metadataWriter = new OperatorStateMetadataWriter(new Path( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala index 76cdbeedaafe7..f45399a21741c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala @@ -21,9 +21,10 @@ import java.util.concurrent.TimeUnit.NANOSECONDS import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.json4s.{DefaultFormats, JString} import org.json4s.JsonAST.JValue import org.json4s.JsonDSL._ -import org.json4s.JString +import org.json4s.jackson.JsonMethods import org.json4s.jackson.JsonMethods.{compact, render} import org.apache.spark.broadcast.Broadcast @@ -431,10 +432,39 @@ case class TransformWithStateExec( } } - private def validateMetadatas( + def validateMetadatas( oldMetadata: OperatorStateMetadata, newMetadata: OperatorStateMetadata): Unit = { - + // if both metadatas are instance of OperatorStateMetadatV2 + (oldMetadata, newMetadata) match { + case (oldMetadataV2: OperatorStateMetadataV2, + newMetadataV2: OperatorStateMetadataV2) => + val oldJsonString = oldMetadataV2.operatorPropertiesJson + val newJsonString = newMetadataV2.operatorPropertiesJson + // verify that timeMode, outputMode are the same + implicit val formats: DefaultFormats.type = DefaultFormats + val oldJsonProps = JsonMethods.parse(oldJsonString).extract[Map[String, Any]] + val newJsonProps = JsonMethods.parse(newJsonString).extract[Map[String, Any]] + val oldTimeMode = oldJsonProps("timeMode").asInstanceOf[String] + val oldOutputMode = oldJsonProps("outputMode").asInstanceOf[String] + val newTimeMode = newJsonProps("timeMode").asInstanceOf[String] + val newOutputMode = newJsonProps("outputMode").asInstanceOf[String] + if (oldTimeMode != newTimeMode) { + throw new StateStoreInvalidConfigAfterRestart( + "timeMode", + oldTimeMode, + newTimeMode + ) + } + if (oldOutputMode != newOutputMode) { + throw new StateStoreInvalidConfigAfterRestart( + "outputMode", + oldOutputMode, + newOutputMode + ) + } + case (_, _) => + } } /** Metadata of this stateful operator and its states stores. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala index 4ac813291c00b..94fb286143f14 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala @@ -173,8 +173,23 @@ object StateStoreErrors { StateStoreProviderDoesNotSupportFineGrainedReplay = { new StateStoreProviderDoesNotSupportFineGrainedReplay(inputClass) } + + def invalidConfigChangedAfterRestart(configName: String, oldConfig: String, newConfig: String): + StateStoreInvalidConfigAfterRestart = { + new StateStoreInvalidConfigAfterRestart(configName, oldConfig, newConfig) + } } +class StateStoreInvalidConfigAfterRestart(configName: String, oldConfig: String, newConfig: String) + extends SparkUnsupportedOperationException( + errorClass = "STATE_STORE_INVALID_CONFIG_AFTER_RESTART", + messageParameters = Map( + "configName" -> configName, + "oldConfig" -> oldConfig, + "newConfig" -> newConfig + ) + ) + class StateStoreMultipleColumnFamiliesNotSupportedException(stateStoreProvider: String) extends SparkUnsupportedOperationException( errorClass = "UNSUPPORTED_FEATURE.STATE_STORE_MULTIPLE_COLUMN_FAMILIES", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala index b0c913549506b..508f82332fe2b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.{Dataset, Encoders, Row} import org.apache.spark.sql.catalyst.util.stringToFile import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{COMPOSITE_KEY_ROW_SCHEMA, KEY_ROW_SCHEMA} -import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled, ColumnFamilySchema, ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, OperatorInfoV1, OperatorStateMetadataV2, POJOTestClass, PrefixKeyScanStateEncoderSpec, RocksDBStateStoreProvider, StatefulProcessorCannotPerformOperationWithInvalidHandleState, StateSchemaV3File, StateStoreMetadataV2, StateStoreMultipleColumnFamiliesNotSupportedException, StateStoreValueSchemaNotCompatible, TestClass} +import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled, ColumnFamilySchema, ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, OperatorInfoV1, OperatorStateMetadataV2, POJOTestClass, PrefixKeyScanStateEncoderSpec, RocksDBStateStoreProvider, StatefulProcessorCannotPerformOperationWithInvalidHandleState, StateSchemaV3File, StateStoreInvalidConfigAfterRestart, StateStoreMetadataV2, StateStoreMultipleColumnFamiliesNotSupportedException, StateStoreValueSchemaNotCompatible, TestClass} import org.apache.spark.sql.functions.timestamp_seconds import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.util.StreamManualClock @@ -1175,6 +1175,42 @@ class TransformWithStateSuite extends StateStoreMetricsTest } } + test("test that different timeMode, outputMode after query restart fails") { + withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> + classOf[RocksDBStateStoreProvider].getName, + SQLConf.SHUFFLE_PARTITIONS.key -> + TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString) { + withTempDir { checkpointDir => + val inputData = MemoryStream[String] + val result1 = inputData.toDS() + .groupByKey(x => x) + .transformWithState(new RunningCountStatefulProcessor(), + TimeMode.None(), + OutputMode.Update()) + + testStream(result1, OutputMode.Update())( + StartStream(checkpointLocation = checkpointDir.getCanonicalPath), + AddData(inputData, "a"), + CheckNewAnswer(("a", "1")), + StopStream + ) + val result2 = inputData.toDS() + .groupByKey(x => x) + .transformWithState(new RunningCountStatefulProcessor(), + TimeMode.None(), + OutputMode.Append()) + testStream(result2, OutputMode.Update())( + StartStream(checkpointLocation = checkpointDir.getCanonicalPath), + AddData(inputData, "a"), + ExpectFailure[StateStoreInvalidConfigAfterRestart] { t => + assert(t.getMessage.contains("outputMode")) + assert(t.getMessage.contains("is not equal")) + } + ) + } + } + } + test("transformWithState - verify StateSchemaV3 writes correct SQL schema of key/value") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName, From e0d8ffb7819ccb3c046f5572295421d356bc3ad1 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Mon, 15 Jul 2024 19:57:51 -0700 Subject: [PATCH 11/15] it works --- .../resources/error/error-conditions.json | 6 ++ .../streaming/ColumnFamilySchemaFactory.scala | 0 .../StatefulProcessorHandleImpl.scala | 25 ++++++ .../streaming/TransformWithStateExec.scala | 38 +++++++++- .../TransformWithStateVariableUtils.scala | 76 +++++++++++++++++++ .../streaming/state/StateStoreErrors.scala | 15 ++++ .../streaming/TransformWithStateSuite.scala | 63 ++++++++++++++- 7 files changed, 216 insertions(+), 7 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ColumnFamilySchemaFactory.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateVariableUtils.scala diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index dc20e411f4cfa..0afe09340b2a9 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -3816,6 +3816,12 @@ ], "sqlState" : "42K06" }, + "STATE_STORE_INVALID_VARIABLE_TYPE_CHANGE" : { + "message" : [ + "Cannot change to between query restarts. Please set to , or restart with a new checkpoint directory." + ], + "sqlState" : "42K06" + }, "STATE_STORE_KEY_ROW_FORMAT_VALIDATION_FAILURE" : { "message" : [ "The streaming query failed to validate written state for key row.", diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ColumnFamilySchemaFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ColumnFamilySchemaFactory.scala deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala index 65b435b5c692c..2eb90fe3b6436 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala @@ -302,18 +302,28 @@ class DriverStatefulProcessorHandleImpl(timeMode: TimeMode, keyExprEnc: Expressi private[sql] val columnFamilySchemaUtils = ColumnFamilySchemaUtilsV1 + private[sql] val stateVariableUtils = TransformWithStateVariableUtils + // Because this is only happening on the driver side, there is only // one task modifying and accessing this map at a time private[sql] val columnFamilySchemas: mutable.Map[String, ColumnFamilySchema] = new mutable.HashMap[String, ColumnFamilySchema]() + private[sql] val stateVariableInfos: mutable.Map[String, TransformWithStateVariableInfo] = + new mutable.HashMap[String, TransformWithStateVariableInfo]() + def getColumnFamilySchemas: Map[String, ColumnFamilySchema] = columnFamilySchemas.toMap + def getStateVariableInfos: Map[String, TransformWithStateVariableInfo] = stateVariableInfos.toMap + override def getValueState[T](stateName: String, valEncoder: Encoder[T]): ValueState[T] = { verifyStateVarOperations("get_value_state", PRE_INIT) val colFamilySchema = columnFamilySchemaUtils. getValueStateSchema(stateName, keyExprEnc, valEncoder, false) columnFamilySchemas.put(stateName, colFamilySchema) + val stateVariableInfo = stateVariableUtils. + getValueState(stateName, ttlEnabled = false) + stateVariableInfos.put(stateName, stateVariableInfo) null.asInstanceOf[ValueState[T]] } @@ -325,6 +335,9 @@ class DriverStatefulProcessorHandleImpl(timeMode: TimeMode, keyExprEnc: Expressi val colFamilySchema = columnFamilySchemaUtils. getValueStateSchema(stateName, keyExprEnc, valEncoder, true) columnFamilySchemas.put(stateName, colFamilySchema) + val stateVariableInfo = stateVariableUtils. + getValueState(stateName, ttlEnabled = true) + stateVariableInfos.put(stateName, stateVariableInfo) null.asInstanceOf[ValueState[T]] } @@ -333,6 +346,9 @@ class DriverStatefulProcessorHandleImpl(timeMode: TimeMode, keyExprEnc: Expressi val colFamilySchema = columnFamilySchemaUtils. getListStateSchema(stateName, keyExprEnc, valEncoder, false) columnFamilySchemas.put(stateName, colFamilySchema) + val stateVariableInfo = stateVariableUtils. + getListState(stateName, ttlEnabled = false) + stateVariableInfos.put(stateName, stateVariableInfo) null.asInstanceOf[ListState[T]] } @@ -344,6 +360,9 @@ class DriverStatefulProcessorHandleImpl(timeMode: TimeMode, keyExprEnc: Expressi val colFamilySchema = columnFamilySchemaUtils. getListStateSchema(stateName, keyExprEnc, valEncoder, true) columnFamilySchemas.put(stateName, colFamilySchema) + val stateVariableInfo = stateVariableUtils. + getListState(stateName, ttlEnabled = true) + stateVariableInfos.put(stateName, stateVariableInfo) null.asInstanceOf[ListState[T]] } @@ -355,6 +374,9 @@ class DriverStatefulProcessorHandleImpl(timeMode: TimeMode, keyExprEnc: Expressi val colFamilySchema = columnFamilySchemaUtils. getMapStateSchema(stateName, keyExprEnc, userKeyEnc, valEncoder, false) columnFamilySchemas.put(stateName, colFamilySchema) + val stateVariableInfo = stateVariableUtils. + getMapState(stateName, ttlEnabled = false) + stateVariableInfos.put(stateName, stateVariableInfo) null.asInstanceOf[MapState[K, V]] } @@ -367,6 +389,9 @@ class DriverStatefulProcessorHandleImpl(timeMode: TimeMode, keyExprEnc: Expressi val colFamilySchema = columnFamilySchemaUtils. getMapStateSchema(stateName, keyExprEnc, valEncoder, userKeyEnc, true) columnFamilySchemas.put(stateName, colFamilySchema) + val stateVariableInfo = stateVariableUtils. + getMapState(stateName, ttlEnabled = true) + stateVariableInfos.put(stateName, stateVariableInfo) null.asInstanceOf[MapState[K, V]] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala index f45399a21741c..d6e8ae60c6165 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala @@ -28,6 +28,7 @@ import org.json4s.jackson.JsonMethods import org.json4s.jackson.JsonMethods.{compact, render} import org.apache.spark.broadcast.Broadcast +import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder @@ -81,7 +82,8 @@ case class TransformWithStateExec( initialStateDataAttrs: Seq[Attribute], initialStateDeserializer: Expression, initialState: SparkPlan) - extends BinaryExecNode with StateStoreWriter with WatermarkSupport with ObjectProducerExec { + extends BinaryExecNode with StateStoreWriter with WatermarkSupport with ObjectProducerExec + with Logging { override def shortName: String = "transformWithStateExec" @@ -123,6 +125,12 @@ case class TransformWithStateExec( columnFamilySchemas } + private def getStateVariableInfos(): Map[String, TransformWithStateVariableInfo] = { + val stateVariableInfos = getDriverProcessorHandle().getStateVariableInfos + closeProcessorHandle() + stateVariableInfos + } + /** * This method is used for the driver-side stateful processor after we * have collected all the necessary schemas. @@ -450,19 +458,38 @@ case class TransformWithStateExec( val newTimeMode = newJsonProps("timeMode").asInstanceOf[String] val newOutputMode = newJsonProps("outputMode").asInstanceOf[String] if (oldTimeMode != newTimeMode) { - throw new StateStoreInvalidConfigAfterRestart( + throw StateStoreErrors.invalidConfigChangedAfterRestart( "timeMode", oldTimeMode, newTimeMode ) } if (oldOutputMode != newOutputMode) { - throw new StateStoreInvalidConfigAfterRestart( + throw StateStoreErrors.invalidConfigChangedAfterRestart( "outputMode", oldOutputMode, newOutputMode ) } + // compare state variable infos + val oldStateVariableInfos = oldJsonProps("stateVariables"). + asInstanceOf[List[Map[String, Any]]] + .map(TransformWithStateVariableInfo.fromMap) + val newStateVariableInfos = getStateVariableInfos() + oldStateVariableInfos.foreach { oldInfo => + val newInfo = newStateVariableInfos.get(oldInfo.stateName) + newInfo match { + case Some(stateVarInfo) => + if (oldInfo.stateVariableType != stateVarInfo.stateVariableType) { + throw StateStoreErrors.invalidVariableTypeChange( + stateVarInfo.stateName, + oldInfo.stateVariableType.toString, + stateVarInfo.stateVariableType.toString + ) + } + case None => + } + } case (_, _) => } } @@ -479,7 +506,10 @@ case class TransformWithStateExec( val operatorPropertiesJson: JValue = ("timeMode" -> JString(timeMode.toString)) ~ - ("outputMode" -> JString(outputMode.toString)) + ("outputMode" -> JString(outputMode.toString)) ~ + ("stateVariables" -> getStateVariableInfos().map { case (_, stateInfo) => + stateInfo.jsonValue + }.arr) val json = compact(render(operatorPropertiesJson)) OperatorStateMetadataV2(operatorInfo, stateStoreInfo, json) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateVariableUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateVariableUtils.scala new file mode 100644 index 0000000000000..3c8f0796c322f --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateVariableUtils.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import org.json4s.DefaultFormats +import org.json4s.JsonAST._ +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods +import org.json4s.jackson.JsonMethods.{compact, render} + +import org.apache.spark.sql.execution.streaming.StateVariableType.StateVariableType + +// Enum of possible State Variable types +object StateVariableType extends Enumeration { + type StateVariableType = Value + val ValueState, ListState, MapState = Value +} + +case class TransformWithStateVariableInfo( + stateName: String, + stateVariableType: StateVariableType, + ttlEnabled: Boolean) { + def jsonValue: JValue = { + ("stateName" -> JString(stateName)) ~ + ("stateVariableType" -> JString(stateVariableType.toString)) ~ + ("ttlEnabled" -> JBool(ttlEnabled)) + } + + def json: String = { + compact(render(jsonValue)) + } +} + +object TransformWithStateVariableInfo { + + def fromJson(json: String): TransformWithStateVariableInfo = { + implicit val formats: DefaultFormats.type = DefaultFormats + val parsed = JsonMethods.parse(json).extract[Map[String, Any]] + fromMap(parsed) + } + + def fromMap(map: Map[String, Any]): TransformWithStateVariableInfo = { + val stateName = map("stateName").asInstanceOf[String] + val stateVariableType = StateVariableType.withName( + map("stateVariableType").asInstanceOf[String]) + val ttlEnabled = map("ttlEnabled").asInstanceOf[Boolean] + TransformWithStateVariableInfo(stateName, stateVariableType, ttlEnabled) + } +} +object TransformWithStateVariableUtils { + def getValueState(stateName: String, ttlEnabled: Boolean): TransformWithStateVariableInfo = { + TransformWithStateVariableInfo(stateName, StateVariableType.ValueState, ttlEnabled) + } + + def getListState(stateName: String, ttlEnabled: Boolean): TransformWithStateVariableInfo = { + TransformWithStateVariableInfo(stateName, StateVariableType.ListState, ttlEnabled) + } + + def getMapState(stateName: String, ttlEnabled: Boolean): TransformWithStateVariableInfo = { + TransformWithStateVariableInfo(stateName, StateVariableType.MapState, ttlEnabled) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala index 94fb286143f14..0ea4d7a3a47d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala @@ -178,6 +178,11 @@ object StateStoreErrors { StateStoreInvalidConfigAfterRestart = { new StateStoreInvalidConfigAfterRestart(configName, oldConfig, newConfig) } + + def invalidVariableTypeChange(stateName: String, oldType: String, newType: String): + StateStoreInvalidVariableTypeChange = { + new StateStoreInvalidVariableTypeChange(stateName, oldType, newType) + } } class StateStoreInvalidConfigAfterRestart(configName: String, oldConfig: String, newConfig: String) @@ -190,6 +195,16 @@ class StateStoreInvalidConfigAfterRestart(configName: String, oldConfig: String, ) ) +class StateStoreInvalidVariableTypeChange(stateName: String, oldType: String, newType: String) + extends SparkUnsupportedOperationException( + errorClass = "STATE_STORE_INVALID_VARIABLE_TYPE_CHANGE", + messageParameters = Map( + "stateName" -> stateName, + "oldType" -> oldType, + "newType" -> newType + ) + ) + class StateStoreMultipleColumnFamiliesNotSupportedException(stateStoreProvider: String) extends SparkUnsupportedOperationException( errorClass = "UNSUPPORTED_FEATURE.STATE_STORE_MULTIPLE_COLUMN_FAMILIES", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala index 508f82332fe2b..dea6adc9389a7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.streaming import java.io.File import java.util.UUID - import org.apache.hadoop.fs.Path import org.apache.spark.SparkRuntimeException @@ -28,7 +27,7 @@ import org.apache.spark.sql.{Dataset, Encoders, Row} import org.apache.spark.sql.catalyst.util.stringToFile import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{COMPOSITE_KEY_ROW_SCHEMA, KEY_ROW_SCHEMA} -import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled, ColumnFamilySchema, ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, OperatorInfoV1, OperatorStateMetadataV2, POJOTestClass, PrefixKeyScanStateEncoderSpec, RocksDBStateStoreProvider, StatefulProcessorCannotPerformOperationWithInvalidHandleState, StateSchemaV3File, StateStoreInvalidConfigAfterRestart, StateStoreMetadataV2, StateStoreMultipleColumnFamiliesNotSupportedException, StateStoreValueSchemaNotCompatible, TestClass} +import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled, ColumnFamilySchema, ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, OperatorInfoV1, OperatorStateMetadataV2, POJOTestClass, PrefixKeyScanStateEncoderSpec, RocksDBStateStoreProvider, StateSchemaV3File, StateStoreInvalidConfigAfterRestart, StateStoreInvalidVariableTypeChange, StateStoreMetadataV2, StateStoreMultipleColumnFamiliesNotSupportedException, StateStoreValueSchemaNotCompatible, StatefulProcessorCannotPerformOperationWithInvalidHandleState, TestClass} import org.apache.spark.sql.functions.timestamp_seconds import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.util.StreamManualClock @@ -64,6 +63,29 @@ class RunningCountStatefulProcessor extends StatefulProcessor[String, String, (S } } +// Class to test that changing between Value and List State fails +// between query runs +class RunningCountListStatefulProcessor + extends StatefulProcessor[String, String, (String, String)] + with Logging { + @transient protected var _countState: ListState[Long] = _ + + override def init( + outputMode: OutputMode, + timeMode: TimeMode): Unit = { + _countState = getHandle.getListState[Long]( + "countState", Encoders.scalaLong) + } + + override def handleInputRows( + key: String, + inputRows: Iterator[String], + timerValues: TimerValues, + expiredTimerInfo: ExpiredTimerInfo): Iterator[(String, String)] = { + Iterator.empty + } +} + class RunningCountStatefulProcessorInt extends StatefulProcessor[String, String, (String, String)] with Logging { @transient protected var _countState: ValueState[Int] = _ @@ -1175,7 +1197,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest } } - test("test that different timeMode, outputMode after query restart fails") { + test("test that different outputMode after query restart fails") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName, SQLConf.SHUFFLE_PARTITIONS.key -> @@ -1211,6 +1233,41 @@ class TransformWithStateSuite extends StateStoreMetricsTest } } + test("test that changing between different state variable types fails") { + withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> + classOf[RocksDBStateStoreProvider].getName, + SQLConf.SHUFFLE_PARTITIONS.key -> + TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString) { + withTempDir { checkpointDir => + val inputData = MemoryStream[String] + val result = inputData.toDS() + .groupByKey(x => x) + .transformWithState(new RunningCountStatefulProcessor(), + TimeMode.None(), + OutputMode.Update()) + + testStream(result, OutputMode.Update())( + StartStream(checkpointLocation = checkpointDir.getCanonicalPath), + AddData(inputData, "a"), + CheckNewAnswer(("a", "1")), + StopStream + ) + val result2 = inputData.toDS() + .groupByKey(x => x) + .transformWithState(new RunningCountListStatefulProcessor(), + TimeMode.None(), + OutputMode.Update()) + testStream(result2, OutputMode.Update())( + StartStream(checkpointLocation = checkpointDir.getCanonicalPath), + AddData(inputData, "a"), + ExpectFailure[StateStoreInvalidVariableTypeChange] { t => + assert(t.getMessage.contains("Cannot change countState")) + } + ) + } + } + } + test("transformWithState - verify StateSchemaV3 writes correct SQL schema of key/value") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName, From 948b2f75f2afaf783cd1f3692044757987b09cd5 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Mon, 15 Jul 2024 20:09:08 -0700 Subject: [PATCH 12/15] it compiles lol --- .../src/main/resources/error/error-conditions.json | 6 ++++++ .../streaming/StatefulProcessorHandleImpl.scala | 12 ++++++++++++ .../execution/streaming/state/StateStoreErrors.scala | 12 ++++++++++++ .../sql/streaming/TransformWithStateSuite.scala | 3 ++- 4 files changed, 32 insertions(+), 1 deletion(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 0afe09340b2a9..8c3dd81d8542f 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -3785,6 +3785,12 @@ ], "sqlState" : "42802" }, + "STATE_STORE_DUPLICATE_STATE_VARIABLE_DEFINED" : { + "message" : [ + "State variable with name has already been defined in the StatefulProcessor." + ], + "sqlState" : "42802" + }, "STATE_STORE_HANDLE_NOT_INITIALIZED" : { "message" : [ "The handle has not been initialized for this StatefulProcessor.", diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala index 2eb90fe3b6436..ec2df52ad6e84 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala @@ -316,10 +316,17 @@ class DriverStatefulProcessorHandleImpl(timeMode: TimeMode, keyExprEnc: Expressi def getStateVariableInfos: Map[String, TransformWithStateVariableInfo] = stateVariableInfos.toMap + def checkIfDuplicateVariableDefined(stateName: String): Unit = { + if (columnFamilySchemas.contains(stateName)) { + throw StateStoreErrors.duplicateStateVariableDefined(stateName) + } + } + override def getValueState[T](stateName: String, valEncoder: Encoder[T]): ValueState[T] = { verifyStateVarOperations("get_value_state", PRE_INIT) val colFamilySchema = columnFamilySchemaUtils. getValueStateSchema(stateName, keyExprEnc, valEncoder, false) + checkIfDuplicateVariableDefined(stateName) columnFamilySchemas.put(stateName, colFamilySchema) val stateVariableInfo = stateVariableUtils. getValueState(stateName, ttlEnabled = false) @@ -334,6 +341,7 @@ class DriverStatefulProcessorHandleImpl(timeMode: TimeMode, keyExprEnc: Expressi verifyStateVarOperations("get_value_state", PRE_INIT) val colFamilySchema = columnFamilySchemaUtils. getValueStateSchema(stateName, keyExprEnc, valEncoder, true) + checkIfDuplicateVariableDefined(stateName) columnFamilySchemas.put(stateName, colFamilySchema) val stateVariableInfo = stateVariableUtils. getValueState(stateName, ttlEnabled = true) @@ -345,6 +353,7 @@ class DriverStatefulProcessorHandleImpl(timeMode: TimeMode, keyExprEnc: Expressi verifyStateVarOperations("get_list_state", PRE_INIT) val colFamilySchema = columnFamilySchemaUtils. getListStateSchema(stateName, keyExprEnc, valEncoder, false) + checkIfDuplicateVariableDefined(stateName) columnFamilySchemas.put(stateName, colFamilySchema) val stateVariableInfo = stateVariableUtils. getListState(stateName, ttlEnabled = false) @@ -359,6 +368,7 @@ class DriverStatefulProcessorHandleImpl(timeMode: TimeMode, keyExprEnc: Expressi verifyStateVarOperations("get_list_state", PRE_INIT) val colFamilySchema = columnFamilySchemaUtils. getListStateSchema(stateName, keyExprEnc, valEncoder, true) + checkIfDuplicateVariableDefined(stateName) columnFamilySchemas.put(stateName, colFamilySchema) val stateVariableInfo = stateVariableUtils. getListState(stateName, ttlEnabled = true) @@ -373,6 +383,7 @@ class DriverStatefulProcessorHandleImpl(timeMode: TimeMode, keyExprEnc: Expressi verifyStateVarOperations("get_map_state", PRE_INIT) val colFamilySchema = columnFamilySchemaUtils. getMapStateSchema(stateName, keyExprEnc, userKeyEnc, valEncoder, false) + checkIfDuplicateVariableDefined(stateName) columnFamilySchemas.put(stateName, colFamilySchema) val stateVariableInfo = stateVariableUtils. getMapState(stateName, ttlEnabled = false) @@ -388,6 +399,7 @@ class DriverStatefulProcessorHandleImpl(timeMode: TimeMode, keyExprEnc: Expressi verifyStateVarOperations("get_map_state", PRE_INIT) val colFamilySchema = columnFamilySchemaUtils. getMapStateSchema(stateName, keyExprEnc, valEncoder, userKeyEnc, true) + checkIfDuplicateVariableDefined(stateName) columnFamilySchemas.put(stateName, colFamilySchema) val stateVariableInfo = stateVariableUtils. getMapState(stateName, ttlEnabled = true) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala index 0ea4d7a3a47d3..6057832aca694 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala @@ -179,11 +179,23 @@ object StateStoreErrors { new StateStoreInvalidConfigAfterRestart(configName, oldConfig, newConfig) } + def duplicateStateVariableDefined(stateName: String): + StateStoreDuplicateStateVariableDefined = { + new StateStoreDuplicateStateVariableDefined(stateName) + } + def invalidVariableTypeChange(stateName: String, oldType: String, newType: String): StateStoreInvalidVariableTypeChange = { new StateStoreInvalidVariableTypeChange(stateName, oldType, newType) } } +class StateStoreDuplicateStateVariableDefined(stateName: String) + extends SparkRuntimeException( + errorClass = "STATE_STORE_DUPLICATE_STATE_VARIABLE_DEFINED", + messageParameters = Map( + "stateName" -> stateName + ) + ) class StateStoreInvalidConfigAfterRestart(configName: String, oldConfig: String, newConfig: String) extends SparkUnsupportedOperationException( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala index dea6adc9389a7..4d5095236bfe9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.streaming import java.io.File import java.util.UUID + import org.apache.hadoop.fs.Path import org.apache.spark.SparkRuntimeException @@ -27,7 +28,7 @@ import org.apache.spark.sql.{Dataset, Encoders, Row} import org.apache.spark.sql.catalyst.util.stringToFile import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{COMPOSITE_KEY_ROW_SCHEMA, KEY_ROW_SCHEMA} -import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled, ColumnFamilySchema, ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, OperatorInfoV1, OperatorStateMetadataV2, POJOTestClass, PrefixKeyScanStateEncoderSpec, RocksDBStateStoreProvider, StateSchemaV3File, StateStoreInvalidConfigAfterRestart, StateStoreInvalidVariableTypeChange, StateStoreMetadataV2, StateStoreMultipleColumnFamiliesNotSupportedException, StateStoreValueSchemaNotCompatible, StatefulProcessorCannotPerformOperationWithInvalidHandleState, TestClass} +import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled, ColumnFamilySchema, ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, OperatorInfoV1, OperatorStateMetadataV2, POJOTestClass, PrefixKeyScanStateEncoderSpec, RocksDBStateStoreProvider, StatefulProcessorCannotPerformOperationWithInvalidHandleState, StateSchemaV3File, StateStoreInvalidConfigAfterRestart, StateStoreInvalidVariableTypeChange, StateStoreMetadataV2, StateStoreMultipleColumnFamiliesNotSupportedException, StateStoreValueSchemaNotCompatible, TestClass} import org.apache.spark.sql.functions.timestamp_seconds import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.util.StreamManualClock From 405eb555957c22f41d4b5719493ef8ca569e8ca3 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Mon, 15 Jul 2024 20:12:56 -0700 Subject: [PATCH 13/15] removing unnecessary files --- .../streaming/StateSchemaV3File.scala | 100 ------------------ .../streaming/TransformWithStateExec.scala | 3 +- 2 files changed, 1 insertion(+), 102 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateSchemaV3File.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateSchemaV3File.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateSchemaV3File.scala deleted file mode 100644 index 82bab9a5301f0..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateSchemaV3File.scala +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.streaming - -import java.io.{InputStream, OutputStream, StringReader} - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream} -import org.json4s.JValue -import org.json4s.jackson.JsonMethods -import org.json4s.jackson.JsonMethods.{compact, render} - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.internal.SQLConf - -class StateSchemaV3File( - hadoopConf: Configuration, - path: String, - metadataCacheEnabled: Boolean = false) - extends HDFSMetadataLog[JValue](hadoopConf, path, metadataCacheEnabled) { - - final val MAX_UTF_CHUNK_SIZE = 65535 - def this(sparkSession: SparkSession, path: String) = { - this( - sparkSession.sessionState.newHadoopConf(), - path, - metadataCacheEnabled = sparkSession.sessionState.conf.getConf( - SQLConf.STREAMING_METADATA_CACHE_ENABLED) - ) - } - - override protected def serialize(schema: JValue, out: OutputStream): Unit = { - val json = compact(render(schema)) - val buf = new Array[Char](MAX_UTF_CHUNK_SIZE) - - val outputStream = out.asInstanceOf[FSDataOutputStream] - // DataOutputStream.writeUTF can't write a string at once - // if the size exceeds 65535 (2^16 - 1) bytes. - // Each metadata consists of multiple chunks in schema version 3. - try { - val numMetadataChunks = (json.length - 1) / MAX_UTF_CHUNK_SIZE + 1 - val metadataStringReader = new StringReader(json) - outputStream.writeInt(numMetadataChunks) - (0 until numMetadataChunks).foreach { _ => - val numRead = metadataStringReader.read(buf, 0, MAX_UTF_CHUNK_SIZE) - outputStream.writeUTF(new String(buf, 0, numRead)) - } - outputStream.close() - } catch { - case e: Throwable => - throw e - } - } - - override protected def deserialize(in: InputStream): JValue = { - val buf = new StringBuilder - val inputStream = in.asInstanceOf[FSDataInputStream] - val numKeyChunks = inputStream.readInt() - (0 until numKeyChunks).foreach(_ => buf.append(inputStream.readUTF())) - val json = buf.toString() - JsonMethods.parse(json) - } - - override def add(batchId: Long, metadata: JValue): Boolean = { - require(metadata != null, "'null' metadata cannot written to a metadata log") - val batchMetadataFile = batchIdToPath(batchId) - if (fileManager.exists(batchMetadataFile)) { - fileManager.delete(batchMetadataFile) - } - val res = addNewBatchByStream(batchId) { output => serialize(metadata, output) } - if (metadataCacheEnabled && res) batchCache.put(batchId, metadata) - res - } - - override def addNewBatchByStream(batchId: Long)(fn: OutputStream => Unit): Boolean = { - val batchMetadataFile = batchIdToPath(batchId) - - if (metadataCacheEnabled && batchCache.containsKey(batchId)) { - false - } else { - write(batchMetadataFile, fn) - true - } - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala index d6e8ae60c6165..40408036e158b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala @@ -82,8 +82,7 @@ case class TransformWithStateExec( initialStateDataAttrs: Seq[Attribute], initialStateDeserializer: Expression, initialState: SparkPlan) - extends BinaryExecNode with StateStoreWriter with WatermarkSupport with ObjectProducerExec - with Logging { + extends BinaryExecNode with StateStoreWriter with WatermarkSupport with ObjectProducerExec { override def shortName: String = "transformWithStateExec" From 96889053e0122ce78f6d9514efa46d23bafb046b Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Mon, 15 Jul 2024 20:41:57 -0700 Subject: [PATCH 14/15] validation --- .../streaming/TransformWithStateExec.scala | 97 ++++++++++--------- .../streaming/TransformWithStateSuite.scala | 11 ++- 2 files changed, 58 insertions(+), 50 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala index 40408036e158b..56a1387701a4c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala @@ -28,7 +28,6 @@ import org.json4s.jackson.JsonMethods import org.json4s.jackson.JsonMethods.{compact, render} import org.apache.spark.broadcast.Broadcast -import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder @@ -439,56 +438,62 @@ case class TransformWithStateExec( } } + private def checkOperatorPropEquality[T]( + fieldName: String, + oldMetadataV2: OperatorStateMetadataV2, + newMetadataV2: OperatorStateMetadataV2): Unit = { + val oldJsonString = oldMetadataV2.operatorPropertiesJson + val newJsonString = newMetadataV2.operatorPropertiesJson + // verify that timeMode, outputMode are the same + implicit val formats: DefaultFormats.type = DefaultFormats + val oldJsonProps = JsonMethods.parse(oldJsonString).extract[Map[String, Any]] + val newJsonProps = JsonMethods.parse(newJsonString).extract[Map[String, Any]] + val oldProp = oldJsonProps(fieldName).asInstanceOf[T] + val newProp = newJsonProps(fieldName).asInstanceOf[T] + if (oldProp != newProp) { + throw StateStoreErrors.invalidConfigChangedAfterRestart( + fieldName, + oldProp.toString, + newProp.toString + ) + } + } + + private def checkStateVariableEquality(oldMetadataV2: OperatorStateMetadataV2): Unit = { + val oldJsonString = oldMetadataV2.operatorPropertiesJson + implicit val formats: DefaultFormats.type = DefaultFormats + val oldJsonProps = JsonMethods.parse(oldJsonString).extract[Map[String, Any]] + // compare state variable infos + val oldStateVariableInfos = oldJsonProps("stateVariables"). + asInstanceOf[List[Map[String, Any]]] + .map(TransformWithStateVariableInfo.fromMap) + val newStateVariableInfos = getStateVariableInfos() + oldStateVariableInfos.foreach { oldInfo => + val newInfo = newStateVariableInfos.get(oldInfo.stateName) + newInfo match { + case Some(stateVarInfo) => + if (oldInfo.stateVariableType != stateVarInfo.stateVariableType) { + throw StateStoreErrors.invalidVariableTypeChange( + stateVarInfo.stateName, + oldInfo.stateVariableType.toString, + stateVarInfo.stateVariableType.toString + ) + } + case None => + } + } + } + def validateMetadatas( oldMetadata: OperatorStateMetadata, newMetadata: OperatorStateMetadata): Unit = { - // if both metadatas are instance of OperatorStateMetadatV2 (oldMetadata, newMetadata) match { - case (oldMetadataV2: OperatorStateMetadataV2, + case ( + oldMetadataV2: OperatorStateMetadataV2, newMetadataV2: OperatorStateMetadataV2) => - val oldJsonString = oldMetadataV2.operatorPropertiesJson - val newJsonString = newMetadataV2.operatorPropertiesJson - // verify that timeMode, outputMode are the same - implicit val formats: DefaultFormats.type = DefaultFormats - val oldJsonProps = JsonMethods.parse(oldJsonString).extract[Map[String, Any]] - val newJsonProps = JsonMethods.parse(newJsonString).extract[Map[String, Any]] - val oldTimeMode = oldJsonProps("timeMode").asInstanceOf[String] - val oldOutputMode = oldJsonProps("outputMode").asInstanceOf[String] - val newTimeMode = newJsonProps("timeMode").asInstanceOf[String] - val newOutputMode = newJsonProps("outputMode").asInstanceOf[String] - if (oldTimeMode != newTimeMode) { - throw StateStoreErrors.invalidConfigChangedAfterRestart( - "timeMode", - oldTimeMode, - newTimeMode - ) - } - if (oldOutputMode != newOutputMode) { - throw StateStoreErrors.invalidConfigChangedAfterRestart( - "outputMode", - oldOutputMode, - newOutputMode - ) - } - // compare state variable infos - val oldStateVariableInfos = oldJsonProps("stateVariables"). - asInstanceOf[List[Map[String, Any]]] - .map(TransformWithStateVariableInfo.fromMap) - val newStateVariableInfos = getStateVariableInfos() - oldStateVariableInfos.foreach { oldInfo => - val newInfo = newStateVariableInfos.get(oldInfo.stateName) - newInfo match { - case Some(stateVarInfo) => - if (oldInfo.stateVariableType != stateVarInfo.stateVariableType) { - throw StateStoreErrors.invalidVariableTypeChange( - stateVarInfo.stateName, - oldInfo.stateVariableType.toString, - stateVarInfo.stateVariableType.toString - ) - } - case None => - } - } + checkOperatorPropEquality[String]("timeMode", oldMetadataV2, newMetadataV2) + checkOperatorPropEquality[String]("outputMode", oldMetadataV2, newMetadataV2) + checkStateVariableEquality(oldMetadataV2) case (_, _) => } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala index 4d5095236bfe9..18cbb53d3b29d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala @@ -973,7 +973,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest } test("transformWithState - verify that OperatorStateMetadataV2" + - " file is being written correctly") { + " integrates with state-metadata source") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName, SQLConf.SHUFFLE_PARTITIONS.key -> @@ -1002,17 +1002,20 @@ class TransformWithStateSuite extends StateStoreMetricsTest Row(0, "transformWithStateExec", "default", 5, 0L, 0L), Row(0, "transformWithStateExec", "default", 5, 1L, 1L) )) + // need line to be unbroken, otherwise the test will fail. + // scalastyle:off + val expectedAnswer = """{"timeMode":"NoTime","outputMode":"Update","stateVariables":[{"stateName":"countState","stateVariableType":"ValueState","ttlEnabled":false}]}""" + // scalastyle:on checkAnswer(df.select(df.metadataColumn("_operatorProperties")), Seq( - Row("""{"timeMode":"NoTime","outputMode":"Update"}"""), - Row("""{"timeMode":"NoTime","outputMode":"Update"}""") + Row(expectedAnswer), + Row(expectedAnswer) ) ) } } } - test("transformWithState - verify that metadata logs are purged") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName, From 654133c3c6729aaf021f548ca8c85a61bf48dd81 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Tue, 16 Jul 2024 08:39:21 -0700 Subject: [PATCH 15/15] removing old code --- .../sql/execution/streaming/MapStateImplWithTTL.scala | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImplWithTTL.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImplWithTTL.scala index 1d3c8e7a9747b..2ab06f36dd5f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImplWithTTL.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImplWithTTL.scala @@ -24,16 +24,6 @@ import org.apache.spark.sql.execution.streaming.state.{PrefixKeyScanStateEncoder import org.apache.spark.sql.streaming.{MapState, TTLConfig} import org.apache.spark.util.NextIterator -object MapStateImplWithTTL { - def columnFamilySchema(stateName: String): ColumnFamilySchemaV1 = { - new ColumnFamilySchemaV1( - stateName, - COMPOSITE_KEY_ROW_SCHEMA, - VALUE_ROW_SCHEMA_WITH_TTL, - PrefixKeyScanStateEncoderSpec(COMPOSITE_KEY_ROW_SCHEMA, 1), false) - } -} - /** * Class that provides a concrete implementation for map state associated with state * variables (with ttl expiration support) used in the streaming transformWithState operator.