Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
7179e33
a dirty working version
jingz-db Jun 26, 2024
d96e164
refactor test suites, have some failing tests
jingz-db Jun 27, 2024
eb7378d
a full working version with full test suites
jingz-db Jun 27, 2024
2f44aaf
need to fix removeColFamilyIfExists test suite
jingz-db Jun 27, 2024
bb4e2bf
fix all suites, add validation suites for noprefix & prefix
jingz-db Jun 27, 2024
febcb07
few TODOs, integration to be added
jingz-db Jun 27, 2024
0856191
scala style
jingz-db Jun 27, 2024
f906369
scala style
jingz-db Jun 27, 2024
36a5247
use 2 bytes as prefix instead of 8
jingz-db Jul 1, 2024
4c0af0e
unused codes in RocksDB
jingz-db Jul 2, 2024
167f8b3
remove unused rocksdb code
jingz-db Jul 2, 2024
1a48930
RocksDBSuite to be fixed
jingz-db Jul 3, 2024
053b696
fix all suites
jingz-db Jul 3, 2024
d32c975
Merge branch 'master' into virtual-col-family
jingz-db Jul 3, 2024
bfc003a
fix scala style after rebase
jingz-db Jul 3, 2024
1393a0d
resolve comments, add comments in code
jingz-db Jul 3, 2024
23f9d41
move useColumnFamilies into key encoder API
jingz-db Jul 4, 2024
cf7f7d6
empty commit to trigger gh job
jingz-db Jul 4, 2024
f8b39b9
refactor put prefix function
jingz-db Jul 4, 2024
2c383ae
few nits
jingz-db Jul 4, 2024
985d2e1
resolve partial comments, TODO refactor of RocksDBKeyEncoder
jingz-db Jul 5, 2024
f775e90
refactored rocksdbkeyencoder - put vcfId into encoder base class
jingz-db Jul 5, 2024
95e5f99
refactor rocksdbkeyencoder
jingz-db Jul 5, 2024
07b8ae2
suite nits
jingz-db Jul 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ import java.util.concurrent.TimeUnit
import javax.annotation.concurrent.GuardedBy

import scala.collection.{mutable, Map}
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.jdk.CollectionConverters._
import scala.collection.mutable.ListBuffer
import scala.ref.WeakReference
import scala.util.Try

Expand Down Expand Up @@ -64,7 +63,6 @@ case object StoreMaintenance extends RocksDBOpType("store_maintenance")
* @param localRootDir Root directory in local disk that is used to working and checkpointing dirs
* @param hadoopConf Hadoop configuration for talking to the remote file system
* @param loggingId Id that will be prepended in logs for isolating concurrent RocksDBs
* @param useColumnFamilies Used to determine whether a single or multiple column families are used
*/
class RocksDB(
dfsRootDir: String,
Expand Down Expand Up @@ -141,11 +139,6 @@ class RocksDB(
dbOptions.setWriteBufferManager(writeBufferManager)
}

// Maintain mapping of column family name to handle
@GuardedBy("acquireLock")
private val colFamilyNameToHandleMap =
scala.collection.mutable.Map[String, ColumnFamilyHandle]()

private val dbLogger = createLogger() // for forwarding RocksDB native logs to log4j
dbOptions.setStatistics(new Statistics())
private val nativeStats = dbOptions.statistics()
Expand Down Expand Up @@ -318,20 +311,16 @@ class RocksDB(
var changelogReader: StateStoreChangelogReader = null
try {
changelogReader = fileManager.getChangelogReader(v, useColumnFamilies)
changelogReader.foreach { case (recordType, key, value, colFamilyName) =>
if (useColumnFamilies && !checkColFamilyExists(colFamilyName)) {
createColFamilyIfAbsent(colFamilyName, checkInternalColumnFamilies(colFamilyName))
}

changelogReader.foreach { case (recordType, key, value) =>
recordType match {
case RecordType.PUT_RECORD =>
put(key, value, colFamilyName)
put(key, value)

case RecordType.DELETE_RECORD =>
remove(key, colFamilyName)
remove(key)

case RecordType.MERGE_RECORD =>
merge(key, value, colFamilyName)
merge(key, value)
}
}
} finally {
Expand All @@ -341,145 +330,28 @@ class RocksDB(
loadedVersion = endVersion
}

/**
* Function to check if the column family exists in the state store instance.
* @param colFamilyName - name of the column family
* @return - true if the column family exists, false otherwise
*/
private def checkColFamilyExists(colFamilyName: String): Boolean = {
colFamilyNameToHandleMap.contains(colFamilyName)
}

private val multColFamiliesDisabledStr = "multiple column families disabled in " +
"RocksDBStateStoreProvider"

/**
* Function to verify invariants for column family based operations such as get, put, remove etc.
* @param operationName - name of the store operation
* @param colFamilyName - name of the column family
*/
private def verifyColFamilyOperations(
operationName: String,
colFamilyName: String): Unit = {
if (colFamilyName != StateStore.DEFAULT_COL_FAMILY_NAME) {
// if the state store instance does not support multiple column families, throw an exception
if (!useColumnFamilies) {
throw StateStoreErrors.unsupportedOperationException(operationName,
multColFamiliesDisabledStr)
}

// if the column family name is empty or contains leading/trailing whitespaces, throw an
// exception
if (colFamilyName.isEmpty || colFamilyName.trim != colFamilyName) {
throw StateStoreErrors.cannotUseColumnFamilyWithInvalidName(operationName, colFamilyName)
}

// if the column family does not exist, throw an exception
if (!checkColFamilyExists(colFamilyName)) {
throw StateStoreErrors.unsupportedOperationOnMissingColumnFamily(operationName,
colFamilyName)
}
}
}

/**
* Function to verify invariants for column family creation or deletion operations.
* @param operationName - name of the store operation
* @param colFamilyName - name of the column family
*/
private def verifyColFamilyCreationOrDeletion(
operationName: String,
colFamilyName: String,
isInternal: Boolean = false): Unit = {
// if the state store instance does not support multiple column families, throw an exception
if (!useColumnFamilies) {
throw StateStoreErrors.unsupportedOperationException(operationName,
multColFamiliesDisabledStr)
}

// if the column family name is empty or contains leading/trailing whitespaces
// or using the reserved "default" column family, throw an exception
if (colFamilyName.isEmpty
|| colFamilyName.trim != colFamilyName
|| colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) {
throw StateStoreErrors.cannotUseColumnFamilyWithInvalidName(operationName, colFamilyName)
}

// if the column family is not internal and uses reserved characters, throw an exception
if (!isInternal && colFamilyName.charAt(0) == '_') {
throw StateStoreErrors.cannotCreateColumnFamilyWithReservedChars(colFamilyName)
}
}

/**
* Check whether the column family name is for internal column families.
* @param cfName - column family name
* @return - true if the column family is for internal use, false otherwise
*/
private def checkInternalColumnFamilies(cfName: String): Boolean = cfName.charAt(0) == '_'

/**
* Create RocksDB column family, if not created already
*/
def createColFamilyIfAbsent(colFamilyName: String, isInternal: Boolean = false): Unit = {
verifyColFamilyCreationOrDeletion("create_col_family", colFamilyName, isInternal)
if (!checkColFamilyExists(colFamilyName)) {
assert(db != null)
val descriptor = new ColumnFamilyDescriptor(colFamilyName.getBytes, columnFamilyOptions)
val handle = db.createColumnFamily(descriptor)
colFamilyNameToHandleMap(handle.getName.map(_.toChar).mkString) = handle
}
}

/**
* Remove RocksDB column family, if exists
*/
def removeColFamilyIfExists(colFamilyName: String): Boolean = {
verifyColFamilyCreationOrDeletion("remove_col_family", colFamilyName)
if (checkColFamilyExists(colFamilyName)) {
assert(db != null)
val handle = colFamilyNameToHandleMap(colFamilyName)
db.dropColumnFamily(handle)
colFamilyNameToHandleMap.remove(colFamilyName)
true
} else {
false
}
}

/**
* Get the value for the given key if present, or null.
* @note This will return the last written value even if it was uncommitted.
*/
def get(
key: Array[Byte],
colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Array[Byte] = {
verifyColFamilyOperations("get", colFamilyName)
db.get(colFamilyNameToHandleMap(colFamilyName), readOptions, key)
def get(key: Array[Byte]): Array[Byte] = {
db.get(readOptions, key)
}

/**
* Put the given value for the given key.
* @note This update is not committed to disk until commit() is called.
*/
def put(
key: Array[Byte],
value: Array[Byte],
colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
verifyColFamilyOperations("put", colFamilyName)
def put(key: Array[Byte], value: Array[Byte]): Unit = {
if (conf.trackTotalNumberOfRows) {
val oldValue = db.get(colFamilyNameToHandleMap(colFamilyName), readOptions, key)
val oldValue = db.get(readOptions, key)
if (oldValue == null) {
numKeysOnWritingVersion += 1
}
}

db.put(colFamilyNameToHandleMap(colFamilyName), writeOptions, key, value)
if (useColumnFamilies) {
changelogWriter.foreach(_.put(key, value, colFamilyName))
} else {
changelogWriter.foreach(_.put(key, value))
}
db.put(writeOptions, key, value)
changelogWriter.foreach(_.put(key, value))
}

/**
Expand All @@ -493,57 +365,40 @@ class RocksDB(
*
* @note This update is not committed to disk until commit() is called.
*/
def merge(
key: Array[Byte],
value: Array[Byte],
colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
if (!useColumnFamilies) {
throw StateStoreErrors.unsupportedOperationException("merge",
multColFamiliesDisabledStr)
}
verifyColFamilyOperations("merge", colFamilyName)
def merge(key: Array[Byte], value: Array[Byte]): Unit = {

if (conf.trackTotalNumberOfRows) {
val oldValue = db.get(colFamilyNameToHandleMap(colFamilyName), readOptions, key)
val oldValue = db.get(readOptions, key)
if (oldValue == null) {
numKeysOnWritingVersion += 1
}
}
db.merge(colFamilyNameToHandleMap(colFamilyName), writeOptions, key, value)
db.merge(writeOptions, key, value)

changelogWriter.foreach(_.merge(key, value, colFamilyName))
changelogWriter.foreach(_.merge(key, value))
}

/**
* Remove the key if present.
* @note This update is not committed to disk until commit() is called.
*/
def remove(
key: Array[Byte],
colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
verifyColFamilyOperations("remove", colFamilyName)
def remove(key: Array[Byte]): Unit = {
if (conf.trackTotalNumberOfRows) {
val value = db.get(colFamilyNameToHandleMap(colFamilyName), readOptions, key)
val value = db.get(readOptions, key)
if (value != null) {
numKeysOnWritingVersion -= 1
}
}
db.delete(colFamilyNameToHandleMap(colFamilyName), writeOptions, key)
if (useColumnFamilies) {
changelogWriter.foreach(_.delete(key, colFamilyName))
} else {
changelogWriter.foreach(_.delete(key))
}
db.delete(writeOptions, key)
changelogWriter.foreach(_.delete(key))
}

/**
* Get an iterator of all committed and uncommitted key-value pairs.
*/
def iterator(colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME):
Iterator[ByteArrayPair] = {
verifyColFamilyOperations("iterator", colFamilyName)
def iterator(): Iterator[ByteArrayPair] = {

val iter = db.newIterator(colFamilyNameToHandleMap(colFamilyName))
val iter = db.newIterator()
logInfo(log"Getting iterator from version ${MDC(LogKeys.LOADED_VERSION, loadedVersion)}")
iter.seekToFirst()

Expand All @@ -569,9 +424,8 @@ class RocksDB(
}
}

private def countKeys(colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Long = {
verifyColFamilyOperations("countKeys", colFamilyName)
val iter = db.newIterator(colFamilyNameToHandleMap(colFamilyName))
private def countKeys(): Long = {
val iter = db.newIterator()

try {
logInfo(log"Counting keys - getting iterator from version " +
Expand All @@ -591,10 +445,8 @@ class RocksDB(
}
}

def prefixScan(prefix: Array[Byte], colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME):
Iterator[ByteArrayPair] = {
verifyColFamilyOperations("prefixScan", colFamilyName)
val iter = db.newIterator(colFamilyNameToHandleMap(colFamilyName))
def prefixScan(prefix: Array[Byte]): Iterator[ByteArrayPair] = {
val iter = db.newIterator()
iter.seek(prefix)

// Attempt to close this iterator if there is a task failure, or a task interruption.
Expand Down Expand Up @@ -639,17 +491,13 @@ class RocksDB(
// because rocksdb wal is disabled.
logInfo(log"Flushing updates for ${MDC(LogKeys.VERSION_NUM, newVersion)}")
flushTimeMs = timeTakenMs {
// Flush updates to all available column families
assert(!colFamilyNameToHandleMap.isEmpty)
db.flush(flushOptions, colFamilyNameToHandleMap.values.toSeq.asJava)
db.flush(flushOptions)
}

if (conf.compactOnCommit) {
logInfo("Compacting")
compactTimeMs = timeTakenMs {
// Perform compaction on all available column families
assert(!colFamilyNameToHandleMap.isEmpty)
colFamilyNameToHandleMap.values.foreach(db.compactRange(_))
db.compactRange()
}
}

Expand Down Expand Up @@ -860,11 +708,6 @@ class RocksDB(
nativeStats.getTickerCount(typ)
}

// Used for metrics reporting around internal/external column families
val numInternalColFamilies = colFamilyNameToHandleMap
.keys.filter(checkInternalColumnFamilies(_)).size
val numExternalColFamilies = colFamilyNameToHandleMap.keys.size - numInternalColFamilies

// if bounded memory usage is enabled, we share the block cache across all state providers
// running on the same node and account the usage to this single cache. In this case, its not
// possible to provide partition level or query level memory usage.
Expand All @@ -886,8 +729,6 @@ class RocksDB(
filesCopied = fileManagerMetrics.filesCopied,
filesReused = fileManagerMetrics.filesReused,
zipFileBytesUncompressed = fileManagerMetrics.zipFileBytesUncompressed,
numExternalColFamilies = numExternalColFamilies,
numInternalColFamilies = numInternalColFamilies,
nativeOpsMetrics = nativeOpsMetrics)
}

Expand Down Expand Up @@ -959,47 +800,16 @@ class RocksDB(
acquireLock.notifyAll()
}

private def getDBProperty(property: String): Long = {
// get cumulative sum across all available column families
assert(!colFamilyNameToHandleMap.isEmpty)
colFamilyNameToHandleMap
.values
.map(handle => db.getProperty(handle, property).toLong)
.sum
}
private def getDBProperty(property: String): Long = db.getProperty(property).toLong

private def openDB(): Unit = {
assert(db == null)
val colFamilies = NativeRocksDB.listColumnFamilies(dbOptions, workingDir.toString)

val colFamilyDescriptors = new ArrayBuffer[ColumnFamilyDescriptor]
// populate the list of available col family descriptors
colFamilies.asScala.toList.foreach { family =>
val descriptor = new ColumnFamilyDescriptor(family, columnFamilyOptions)
colFamilyDescriptors += descriptor
}

if (colFamilyDescriptors.isEmpty) {
colFamilyDescriptors += new ColumnFamilyDescriptor(NativeRocksDB.DEFAULT_COLUMN_FAMILY,
columnFamilyOptions)
}

val colFamilyHandles = new java.util.ArrayList[ColumnFamilyHandle]()
db = NativeRocksDB.open(new DBOptions(dbOptions), workingDir.toString,
colFamilyDescriptors.asJava, colFamilyHandles)

// Store the mapping of names to handles in the internal map
colFamilyHandles.asScala.toList.foreach { handle =>
colFamilyNameToHandleMap(handle.getName.map(_.toChar).mkString) = handle
}
db = NativeRocksDB.open(dbOptions, workingDir.toString)
logInfo(log"Opened DB with conf ${MDC(LogKeys.CONFIG, conf)}")
}

private def closeDB(): Unit = {
if (db != null) {
// Close the column family handles in case multiple column families are used
colFamilyNameToHandleMap.values.map(handle => handle.close)
colFamilyNameToHandleMap.clear()

// Cancel and wait until all background work finishes
db.cancelAllBackgroundWork(true)
Expand Down Expand Up @@ -1298,8 +1108,6 @@ case class RocksDBMetrics(
bytesCopied: Long,
filesReused: Long,
zipFileBytesUncompressed: Option[Long],
numExternalColFamilies: Long,
numInternalColFamilies: Long,
nativeOpsMetrics: Map[String, Long]) {
def json: String = Serialization.write(this)(RocksDBMetrics.format)
}
Expand Down
Loading