Skip to content

Commit 4bf5fee

Browse files
jingz-dbattilapiros
authored andcommitted
[SPARK-48742][SS] Virtual Column Family for RocksDB
### What changes were proposed in this pull request? Introducing virtual column family to RocksDB. We attach an 2-byte-Id prefix as column family identifier for each of the key row that is put into RocksDB. The encoding and decoding of the virtual column family prefix happens at the `RocksDBKeyEncoder` layer as we can pre-allocate extra 2 bytes and avoid additional memcpy. - Remove Physical Column Family related codes as this becomes potentially dead code till some caller starts using this. - Remove `useColumnFamilies` from `StateStoreChangelogV2` API. ### Why are the changes needed? Currently within the scope of the arbitrary stateful API v2 (transformWithState) project, each state variable is stored inside one [physical column family](https://github.com/facebook/rocksdb/wiki/Column-Families) within the RocksDB state store instance. Column families are also used to implement secondary indexes for various features. Each physical column family has its own memtables, creates its own SST files, and handles compaction independently on those independent SST files. When the number of operations to RocksDB is relatively small and the number of column families is relatively large, the overhead of handling small SST files becomes high, especially since all of these have to be uploaded in the snapshot dir and referenced in the metadata file for the uploaded RocksDB snapshot. Using prefix to manage different key spaces / virtual column family could reduce such overheads. ### Does this PR introduce _any_ user-facing change? No. If `useColumnFamilies` are set to true in the `StateStore.init()`, virtual column family will be used. ### How was this patch tested? Unit tests in `RocksDBStateStoreSuite`, and integration tests in `TransformWithStateSuite`. Moved test suites in `RocksDBSuite` into `RocksDBStateStoreSuite` because some previous verification functions are now moved into `RocksDBStateProvider` ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#47107 from jingz-db/virtual-col-family. Lead-authored-by: jingz-db <[email protected]> Co-authored-by: Jing Zhan <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
1 parent 07a07e4 commit 4bf5fee

File tree

8 files changed

+724
-738
lines changed

8 files changed

+724
-738
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala

Lines changed: 29 additions & 221 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@ import java.util.concurrent.TimeUnit
2323
import javax.annotation.concurrent.GuardedBy
2424

2525
import scala.collection.{mutable, Map}
26-
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
27-
import scala.jdk.CollectionConverters._
26+
import scala.collection.mutable.ListBuffer
2827
import scala.ref.WeakReference
2928
import scala.util.Try
3029

@@ -64,7 +63,6 @@ case object StoreMaintenance extends RocksDBOpType("store_maintenance")
6463
* @param localRootDir Root directory in local disk that is used to working and checkpointing dirs
6564
* @param hadoopConf Hadoop configuration for talking to the remote file system
6665
* @param loggingId Id that will be prepended in logs for isolating concurrent RocksDBs
67-
* @param useColumnFamilies Used to determine whether a single or multiple column families are used
6866
*/
6967
class RocksDB(
7068
dfsRootDir: String,
@@ -141,11 +139,6 @@ class RocksDB(
141139
dbOptions.setWriteBufferManager(writeBufferManager)
142140
}
143141

144-
// Maintain mapping of column family name to handle
145-
@GuardedBy("acquireLock")
146-
private val colFamilyNameToHandleMap =
147-
scala.collection.mutable.Map[String, ColumnFamilyHandle]()
148-
149142
private val dbLogger = createLogger() // for forwarding RocksDB native logs to log4j
150143
dbOptions.setStatistics(new Statistics())
151144
private val nativeStats = dbOptions.statistics()
@@ -318,20 +311,16 @@ class RocksDB(
318311
var changelogReader: StateStoreChangelogReader = null
319312
try {
320313
changelogReader = fileManager.getChangelogReader(v, useColumnFamilies)
321-
changelogReader.foreach { case (recordType, key, value, colFamilyName) =>
322-
if (useColumnFamilies && !checkColFamilyExists(colFamilyName)) {
323-
createColFamilyIfAbsent(colFamilyName, checkInternalColumnFamilies(colFamilyName))
324-
}
325-
314+
changelogReader.foreach { case (recordType, key, value) =>
326315
recordType match {
327316
case RecordType.PUT_RECORD =>
328-
put(key, value, colFamilyName)
317+
put(key, value)
329318

330319
case RecordType.DELETE_RECORD =>
331-
remove(key, colFamilyName)
320+
remove(key)
332321

333322
case RecordType.MERGE_RECORD =>
334-
merge(key, value, colFamilyName)
323+
merge(key, value)
335324
}
336325
}
337326
} finally {
@@ -341,145 +330,28 @@ class RocksDB(
341330
loadedVersion = endVersion
342331
}
343332

344-
/**
345-
* Function to check if the column family exists in the state store instance.
346-
* @param colFamilyName - name of the column family
347-
* @return - true if the column family exists, false otherwise
348-
*/
349-
private def checkColFamilyExists(colFamilyName: String): Boolean = {
350-
colFamilyNameToHandleMap.contains(colFamilyName)
351-
}
352-
353-
private val multColFamiliesDisabledStr = "multiple column families disabled in " +
354-
"RocksDBStateStoreProvider"
355-
356-
/**
357-
* Function to verify invariants for column family based operations such as get, put, remove etc.
358-
* @param operationName - name of the store operation
359-
* @param colFamilyName - name of the column family
360-
*/
361-
private def verifyColFamilyOperations(
362-
operationName: String,
363-
colFamilyName: String): Unit = {
364-
if (colFamilyName != StateStore.DEFAULT_COL_FAMILY_NAME) {
365-
// if the state store instance does not support multiple column families, throw an exception
366-
if (!useColumnFamilies) {
367-
throw StateStoreErrors.unsupportedOperationException(operationName,
368-
multColFamiliesDisabledStr)
369-
}
370-
371-
// if the column family name is empty or contains leading/trailing whitespaces, throw an
372-
// exception
373-
if (colFamilyName.isEmpty || colFamilyName.trim != colFamilyName) {
374-
throw StateStoreErrors.cannotUseColumnFamilyWithInvalidName(operationName, colFamilyName)
375-
}
376-
377-
// if the column family does not exist, throw an exception
378-
if (!checkColFamilyExists(colFamilyName)) {
379-
throw StateStoreErrors.unsupportedOperationOnMissingColumnFamily(operationName,
380-
colFamilyName)
381-
}
382-
}
383-
}
384-
385-
/**
386-
* Function to verify invariants for column family creation or deletion operations.
387-
* @param operationName - name of the store operation
388-
* @param colFamilyName - name of the column family
389-
*/
390-
private def verifyColFamilyCreationOrDeletion(
391-
operationName: String,
392-
colFamilyName: String,
393-
isInternal: Boolean = false): Unit = {
394-
// if the state store instance does not support multiple column families, throw an exception
395-
if (!useColumnFamilies) {
396-
throw StateStoreErrors.unsupportedOperationException(operationName,
397-
multColFamiliesDisabledStr)
398-
}
399-
400-
// if the column family name is empty or contains leading/trailing whitespaces
401-
// or using the reserved "default" column family, throw an exception
402-
if (colFamilyName.isEmpty
403-
|| colFamilyName.trim != colFamilyName
404-
|| colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) {
405-
throw StateStoreErrors.cannotUseColumnFamilyWithInvalidName(operationName, colFamilyName)
406-
}
407-
408-
// if the column family is not internal and uses reserved characters, throw an exception
409-
if (!isInternal && colFamilyName.charAt(0) == '_') {
410-
throw StateStoreErrors.cannotCreateColumnFamilyWithReservedChars(colFamilyName)
411-
}
412-
}
413-
414-
/**
415-
* Check whether the column family name is for internal column families.
416-
* @param cfName - column family name
417-
* @return - true if the column family is for internal use, false otherwise
418-
*/
419-
private def checkInternalColumnFamilies(cfName: String): Boolean = cfName.charAt(0) == '_'
420-
421-
/**
422-
* Create RocksDB column family, if not created already
423-
*/
424-
def createColFamilyIfAbsent(colFamilyName: String, isInternal: Boolean = false): Unit = {
425-
verifyColFamilyCreationOrDeletion("create_col_family", colFamilyName, isInternal)
426-
if (!checkColFamilyExists(colFamilyName)) {
427-
assert(db != null)
428-
val descriptor = new ColumnFamilyDescriptor(colFamilyName.getBytes, columnFamilyOptions)
429-
val handle = db.createColumnFamily(descriptor)
430-
colFamilyNameToHandleMap(handle.getName.map(_.toChar).mkString) = handle
431-
}
432-
}
433-
434-
/**
435-
* Remove RocksDB column family, if exists
436-
*/
437-
def removeColFamilyIfExists(colFamilyName: String): Boolean = {
438-
verifyColFamilyCreationOrDeletion("remove_col_family", colFamilyName)
439-
if (checkColFamilyExists(colFamilyName)) {
440-
assert(db != null)
441-
val handle = colFamilyNameToHandleMap(colFamilyName)
442-
db.dropColumnFamily(handle)
443-
colFamilyNameToHandleMap.remove(colFamilyName)
444-
true
445-
} else {
446-
false
447-
}
448-
}
449-
450333
/**
451334
* Get the value for the given key if present, or null.
452335
* @note This will return the last written value even if it was uncommitted.
453336
*/
454-
def get(
455-
key: Array[Byte],
456-
colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Array[Byte] = {
457-
verifyColFamilyOperations("get", colFamilyName)
458-
db.get(colFamilyNameToHandleMap(colFamilyName), readOptions, key)
337+
def get(key: Array[Byte]): Array[Byte] = {
338+
db.get(readOptions, key)
459339
}
460340

461341
/**
462342
* Put the given value for the given key.
463343
* @note This update is not committed to disk until commit() is called.
464344
*/
465-
def put(
466-
key: Array[Byte],
467-
value: Array[Byte],
468-
colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
469-
verifyColFamilyOperations("put", colFamilyName)
345+
def put(key: Array[Byte], value: Array[Byte]): Unit = {
470346
if (conf.trackTotalNumberOfRows) {
471-
val oldValue = db.get(colFamilyNameToHandleMap(colFamilyName), readOptions, key)
347+
val oldValue = db.get(readOptions, key)
472348
if (oldValue == null) {
473349
numKeysOnWritingVersion += 1
474350
}
475351
}
476352

477-
db.put(colFamilyNameToHandleMap(colFamilyName), writeOptions, key, value)
478-
if (useColumnFamilies) {
479-
changelogWriter.foreach(_.put(key, value, colFamilyName))
480-
} else {
481-
changelogWriter.foreach(_.put(key, value))
482-
}
353+
db.put(writeOptions, key, value)
354+
changelogWriter.foreach(_.put(key, value))
483355
}
484356

485357
/**
@@ -493,57 +365,40 @@ class RocksDB(
493365
*
494366
* @note This update is not committed to disk until commit() is called.
495367
*/
496-
def merge(
497-
key: Array[Byte],
498-
value: Array[Byte],
499-
colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
500-
if (!useColumnFamilies) {
501-
throw StateStoreErrors.unsupportedOperationException("merge",
502-
multColFamiliesDisabledStr)
503-
}
504-
verifyColFamilyOperations("merge", colFamilyName)
368+
def merge(key: Array[Byte], value: Array[Byte]): Unit = {
505369

506370
if (conf.trackTotalNumberOfRows) {
507-
val oldValue = db.get(colFamilyNameToHandleMap(colFamilyName), readOptions, key)
371+
val oldValue = db.get(readOptions, key)
508372
if (oldValue == null) {
509373
numKeysOnWritingVersion += 1
510374
}
511375
}
512-
db.merge(colFamilyNameToHandleMap(colFamilyName), writeOptions, key, value)
376+
db.merge(writeOptions, key, value)
513377

514-
changelogWriter.foreach(_.merge(key, value, colFamilyName))
378+
changelogWriter.foreach(_.merge(key, value))
515379
}
516380

517381
/**
518382
* Remove the key if present.
519383
* @note This update is not committed to disk until commit() is called.
520384
*/
521-
def remove(
522-
key: Array[Byte],
523-
colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
524-
verifyColFamilyOperations("remove", colFamilyName)
385+
def remove(key: Array[Byte]): Unit = {
525386
if (conf.trackTotalNumberOfRows) {
526-
val value = db.get(colFamilyNameToHandleMap(colFamilyName), readOptions, key)
387+
val value = db.get(readOptions, key)
527388
if (value != null) {
528389
numKeysOnWritingVersion -= 1
529390
}
530391
}
531-
db.delete(colFamilyNameToHandleMap(colFamilyName), writeOptions, key)
532-
if (useColumnFamilies) {
533-
changelogWriter.foreach(_.delete(key, colFamilyName))
534-
} else {
535-
changelogWriter.foreach(_.delete(key))
536-
}
392+
db.delete(writeOptions, key)
393+
changelogWriter.foreach(_.delete(key))
537394
}
538395

539396
/**
540397
* Get an iterator of all committed and uncommitted key-value pairs.
541398
*/
542-
def iterator(colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME):
543-
Iterator[ByteArrayPair] = {
544-
verifyColFamilyOperations("iterator", colFamilyName)
399+
def iterator(): Iterator[ByteArrayPair] = {
545400

546-
val iter = db.newIterator(colFamilyNameToHandleMap(colFamilyName))
401+
val iter = db.newIterator()
547402
logInfo(log"Getting iterator from version ${MDC(LogKeys.LOADED_VERSION, loadedVersion)}")
548403
iter.seekToFirst()
549404

@@ -569,9 +424,8 @@ class RocksDB(
569424
}
570425
}
571426

572-
private def countKeys(colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Long = {
573-
verifyColFamilyOperations("countKeys", colFamilyName)
574-
val iter = db.newIterator(colFamilyNameToHandleMap(colFamilyName))
427+
private def countKeys(): Long = {
428+
val iter = db.newIterator()
575429

576430
try {
577431
logInfo(log"Counting keys - getting iterator from version " +
@@ -591,10 +445,8 @@ class RocksDB(
591445
}
592446
}
593447

594-
def prefixScan(prefix: Array[Byte], colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME):
595-
Iterator[ByteArrayPair] = {
596-
verifyColFamilyOperations("prefixScan", colFamilyName)
597-
val iter = db.newIterator(colFamilyNameToHandleMap(colFamilyName))
448+
def prefixScan(prefix: Array[Byte]): Iterator[ByteArrayPair] = {
449+
val iter = db.newIterator()
598450
iter.seek(prefix)
599451

600452
// Attempt to close this iterator if there is a task failure, or a task interruption.
@@ -639,17 +491,13 @@ class RocksDB(
639491
// because rocksdb wal is disabled.
640492
logInfo(log"Flushing updates for ${MDC(LogKeys.VERSION_NUM, newVersion)}")
641493
flushTimeMs = timeTakenMs {
642-
// Flush updates to all available column families
643-
assert(!colFamilyNameToHandleMap.isEmpty)
644-
db.flush(flushOptions, colFamilyNameToHandleMap.values.toSeq.asJava)
494+
db.flush(flushOptions)
645495
}
646496

647497
if (conf.compactOnCommit) {
648498
logInfo("Compacting")
649499
compactTimeMs = timeTakenMs {
650-
// Perform compaction on all available column families
651-
assert(!colFamilyNameToHandleMap.isEmpty)
652-
colFamilyNameToHandleMap.values.foreach(db.compactRange(_))
500+
db.compactRange()
653501
}
654502
}
655503

@@ -860,11 +708,6 @@ class RocksDB(
860708
nativeStats.getTickerCount(typ)
861709
}
862710

863-
// Used for metrics reporting around internal/external column families
864-
val numInternalColFamilies = colFamilyNameToHandleMap
865-
.keys.filter(checkInternalColumnFamilies(_)).size
866-
val numExternalColFamilies = colFamilyNameToHandleMap.keys.size - numInternalColFamilies
867-
868711
// if bounded memory usage is enabled, we share the block cache across all state providers
869712
// running on the same node and account the usage to this single cache. In this case, its not
870713
// possible to provide partition level or query level memory usage.
@@ -886,8 +729,6 @@ class RocksDB(
886729
filesCopied = fileManagerMetrics.filesCopied,
887730
filesReused = fileManagerMetrics.filesReused,
888731
zipFileBytesUncompressed = fileManagerMetrics.zipFileBytesUncompressed,
889-
numExternalColFamilies = numExternalColFamilies,
890-
numInternalColFamilies = numInternalColFamilies,
891732
nativeOpsMetrics = nativeOpsMetrics)
892733
}
893734

@@ -959,47 +800,16 @@ class RocksDB(
959800
acquireLock.notifyAll()
960801
}
961802

962-
private def getDBProperty(property: String): Long = {
963-
// get cumulative sum across all available column families
964-
assert(!colFamilyNameToHandleMap.isEmpty)
965-
colFamilyNameToHandleMap
966-
.values
967-
.map(handle => db.getProperty(handle, property).toLong)
968-
.sum
969-
}
803+
private def getDBProperty(property: String): Long = db.getProperty(property).toLong
970804

971805
private def openDB(): Unit = {
972806
assert(db == null)
973-
val colFamilies = NativeRocksDB.listColumnFamilies(dbOptions, workingDir.toString)
974-
975-
val colFamilyDescriptors = new ArrayBuffer[ColumnFamilyDescriptor]
976-
// populate the list of available col family descriptors
977-
colFamilies.asScala.toList.foreach { family =>
978-
val descriptor = new ColumnFamilyDescriptor(family, columnFamilyOptions)
979-
colFamilyDescriptors += descriptor
980-
}
981-
982-
if (colFamilyDescriptors.isEmpty) {
983-
colFamilyDescriptors += new ColumnFamilyDescriptor(NativeRocksDB.DEFAULT_COLUMN_FAMILY,
984-
columnFamilyOptions)
985-
}
986-
987-
val colFamilyHandles = new java.util.ArrayList[ColumnFamilyHandle]()
988-
db = NativeRocksDB.open(new DBOptions(dbOptions), workingDir.toString,
989-
colFamilyDescriptors.asJava, colFamilyHandles)
990-
991-
// Store the mapping of names to handles in the internal map
992-
colFamilyHandles.asScala.toList.foreach { handle =>
993-
colFamilyNameToHandleMap(handle.getName.map(_.toChar).mkString) = handle
994-
}
807+
db = NativeRocksDB.open(dbOptions, workingDir.toString)
995808
logInfo(log"Opened DB with conf ${MDC(LogKeys.CONFIG, conf)}")
996809
}
997810

998811
private def closeDB(): Unit = {
999812
if (db != null) {
1000-
// Close the column family handles in case multiple column families are used
1001-
colFamilyNameToHandleMap.values.map(handle => handle.close)
1002-
colFamilyNameToHandleMap.clear()
1003813

1004814
// Cancel and wait until all background work finishes
1005815
db.cancelAllBackgroundWork(true)
@@ -1298,8 +1108,6 @@ case class RocksDBMetrics(
12981108
bytesCopied: Long,
12991109
filesReused: Long,
13001110
zipFileBytesUncompressed: Option[Long],
1301-
numExternalColFamilies: Long,
1302-
numInternalColFamilies: Long,
13031111
nativeOpsMetrics: Map[String, Long]) {
13041112
def json: String = Serialization.write(this)(RocksDBMetrics.format)
13051113
}

0 commit comments

Comments
 (0)