Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2230,6 +2230,17 @@ object SQLConf {
.intConf
.createWithDefault(1)

val STREAMING_STATE_STORE_ENCODING_FORMAT =
buildConf("spark.sql.streaming.stateStore.encodingFormat")
.doc("The encoding format used for stateful operators to store information " +
"in the state store")
.version("4.0.0")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValue(v => Set("unsaferow", "avro").contains(v),
"Valid values are 'unsaferow' and 'avro'")
.createWithDefault("unsaferow")

val STATE_STORE_COMPRESSION_CODEC =
buildConf("spark.sql.streaming.stateStore.compression.codec")
.internal()
Expand Down Expand Up @@ -5607,6 +5618,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {

def stateStoreCheckpointFormatVersion: Int = getConf(STATE_STORE_CHECKPOINT_FORMAT_VERSION)

def stateStoreEncodingFormat: String = getConf(STREAMING_STATE_STORE_ENCODING_FORMAT)

def checkpointRenamedFileCheck: Boolean = getConf(CHECKPOINT_RENAMEDFILE_CHECK_ENABLED)

def parquetFilterPushDown: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,49 @@ import org.apache.spark.sql.Encoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchemaUtils._
import org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, PrefixKeyScanStateEncoderSpec, StateStoreColFamilySchema}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._

object StateStoreColumnFamilySchemaUtils {

/**
* Avro uses zig-zag encoding for some fixed-length types, like Longs and Ints. For range scans
* we want to use big-endian encoding, so we need to convert the source schema to replace these
* types with BinaryType.
*
* @param schema The schema to convert
* @param ordinals If non-empty, only convert fields at these ordinals.
* If empty, convert all fields.
*/
def convertForRangeScan(schema: StructType, ordinals: Seq[Int] = Seq.empty): StructType = {
val ordinalSet = ordinals.toSet

StructType(schema.fields.zipWithIndex.flatMap { case (field, idx) =>
if ((ordinals.isEmpty || ordinalSet.contains(idx)) && isFixedSize(field.dataType)) {
// For each numeric field, create two fields:
// 1. Byte marker for null, positive, or negative values
// 2. The original numeric value in big-endian format
// Byte type is converted to Int in Avro, which doesn't work for us as Avro
// uses zig-zag encoding as opposed to big-endian for Ints
Seq(
StructField(s"${field.name}_marker", BinaryType, nullable = false),
field.copy(name = s"${field.name}_value", BinaryType)
)
} else {
Seq(field)
}
})
}

private def isFixedSize(dataType: DataType): Boolean = dataType match {
case _: ByteType | _: BooleanType | _: ShortType | _: IntegerType | _: LongType |
_: FloatType | _: DoubleType => true
case _ => false
}

def getTtlColFamilyName(stateName: String): String = {
"$ttl_" + stateName
}

def getValueStateSchema[T](
stateName: String,
keyEncoder: ExpressionEncoder[Any],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,7 @@ abstract class StreamExecution(

object StreamExecution {
val QUERY_ID_KEY = "sql.streaming.queryId"
val RUN_ID_KEY = "sql.streaming.runId"
val IS_CONTINUOUS_PROCESSING = "__is_continuous_processing"
val IO_EXCEPTION_NAMES = Seq(
classOf[InterruptedException].getName,
Expand Down
Loading