Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import scala.ref.WeakReference
import scala.util.Try

import org.apache.hadoop.conf.Configuration
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization
import org.rocksdb.{RocksDB => NativeRocksDB, _}

import org.apache.spark.TaskContext
Expand Down Expand Up @@ -72,6 +74,7 @@ class RocksDB(
dbOptions.setTableFormatConfig(tableFormatConfig)
private val dbLogger = createLogger() // for forwarding RocksDB native logs to log4j
dbOptions.setStatistics(new Statistics())
private val nativeStats = dbOptions.statistics()

private val workingDir = createTempDir("workingDir")
private val fileManager = new RocksDBFileManager(
Expand All @@ -84,6 +87,7 @@ class RocksDB(
@volatile private var loadedVersion = -1L // -1 = nothing valid is loaded
@volatile private var numKeysOnLoadedVersion = 0L
@volatile private var numKeysOnWritingVersion = 0L
@volatile private var fileManagerMetrics = RocksDBFileManagerMetrics.EMPTY_METRICS

@GuardedBy("acquireLock")
@volatile private var acquiredThreadInfo: AcquiredThreadInfo = _
Expand All @@ -105,6 +109,7 @@ class RocksDB(
numKeysOnWritingVersion = metadata.numKeys
numKeysOnLoadedVersion = metadata.numKeys
loadedVersion = version
fileManagerMetrics = fileManager.latestLoadCheckpointMetrics
}
writeBatch.clear()
logInfo(s"Loaded $version")
Expand Down Expand Up @@ -223,6 +228,7 @@ class RocksDB(
}
numKeysOnLoadedVersion = numKeysOnWritingVersion
loadedVersion = newVersion
fileManagerMetrics = fileManager.latestSaveCheckpointMetrics
commitLatencyMs ++= Map(
"writeBatch" -> writeTimeMs,
"flush" -> flushTimeMs,
Expand All @@ -231,6 +237,7 @@ class RocksDB(
"checkpoint" -> checkpointTimeMs,
"fileSync" -> fileSyncTimeMs
)
logInfo(s"Committed $newVersion, stats = ${metrics.json}")
loadedVersion
} catch {
case t: Throwable =>
Expand Down Expand Up @@ -283,6 +290,30 @@ class RocksDB(
/** Get the latest version available in the DFS */
def getLatestVersion(): Long = fileManager.getLatestVersion()

/** Get current instantaneous statistics */
def metrics: RocksDBMetrics = {
import HistogramType._
val totalSSTFilesBytes = getDBProperty("rocksdb.total-sst-files-size")
val readerMemUsage = getDBProperty("rocksdb.estimate-table-readers-mem")
val memTableMemUsage = getDBProperty("rocksdb.size-all-mem-tables")
val nativeOps = Seq("get" -> DB_GET, "put" -> DB_WRITE).toMap
val nativeOpsLatencyMicros = nativeOps.mapValues { typ =>
RocksDBNativeHistogram(nativeStats.getHistogramData(typ))
}

RocksDBMetrics(
numKeysOnLoadedVersion,
numKeysOnWritingVersion,
readerMemUsage + memTableMemUsage,
totalSSTFilesBytes,
nativeOpsLatencyMicros.toMap,
commitLatencyMs,
bytesCopied = fileManagerMetrics.bytesCopied,
filesCopied = fileManagerMetrics.filesCopied,
filesReused = fileManagerMetrics.filesReused,
zipFileBytesUncompressed = fileManagerMetrics.zipFileBytesUncompressed)
}

private def acquire(): Unit = acquireLock.synchronized {
val newAcquiredThreadInfo = AcquiredThreadInfo()
val waitStartTime = System.currentTimeMillis
Expand Down Expand Up @@ -314,6 +345,10 @@ class RocksDB(
acquireLock.notifyAll()
}

private def getDBProperty(property: String): Long = {
db.getProperty(property).toLong
}

private def openDB(): Unit = {
assert(db == null)
db = NativeRocksDB.open(dbOptions, workingDir.toString)
Expand Down Expand Up @@ -388,7 +423,6 @@ class ByteArrayPair(var key: Array[Byte] = null, var value: Array[Byte] = null)

/**
* Configurations for optimizing RocksDB
*
* @param compactOnCommit Whether to compact RocksDB data before commit / checkpointing
*/
case class RocksDBConf(
Expand Down Expand Up @@ -442,6 +476,42 @@ object RocksDBConf {
def apply(): RocksDBConf = apply(new StateStoreConf())
}

/** Class to represent stats from each commit. */
case class RocksDBMetrics(
numCommittedKeys: Long,
numUncommittedKeys: Long,
memUsageBytes: Long,
totalSSTFilesBytes: Long,
nativeOpsLatencyMicros: Map[String, RocksDBNativeHistogram],
lastCommitLatencyMs: Map[String, Long],
filesCopied: Long,
bytesCopied: Long,
filesReused: Long,
zipFileBytesUncompressed: Option[Long]) {
def json: String = Serialization.write(this)(RocksDBMetrics.format)
}

object RocksDBMetrics {
val format = Serialization.formats(NoTypeHints)
}

/** Class to wrap RocksDB's native histogram */
case class RocksDBNativeHistogram(
avg: Double, stddev: Double, median: Double, p95: Double, p99: Double) {
def json: String = Serialization.write(this)(RocksDBMetrics.format)
}

object RocksDBNativeHistogram {
def apply(nativeHist: HistogramData): RocksDBNativeHistogram = {
RocksDBNativeHistogram(
nativeHist.getAverage,
nativeHist.getStandardDeviation,
nativeHist.getMedian,
nativeHist.getPercentile95,
nativeHist.getPercentile99)
}
}

case class AcquiredThreadInfo() {
val threadRef: WeakReference[Thread] = new WeakReference[Thread](Thread.currentThread())
val tc: TaskContext = TaskContext.get()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import scala.collection.mutable

import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
import org.apache.commons.io.{FilenameUtils, IOUtils}
import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -134,6 +135,22 @@ class RocksDBFileManager(
override def accept(path: Path): Boolean = path.toString.endsWith(".zip")
}

/**
* Metrics for loading checkpoint from DFS. Every loadCheckpointFromDFS call will update this
* metrics, so this effectively records the latest metrics.
*/
@volatile private var loadCheckpointMetrics = RocksDBFileManagerMetrics.EMPTY_METRICS

/**
* Metrics for saving checkpoint to DFS. Every saveCheckpointToDFS call will update this
* metrics, so this effectively records the latest metrics.
*/
@volatile private var saveCheckpointMetrics = RocksDBFileManagerMetrics.EMPTY_METRICS

def latestLoadCheckpointMetrics: RocksDBFileManagerMetrics = loadCheckpointMetrics

def latestSaveCheckpointMetrics: RocksDBFileManagerMetrics = saveCheckpointMetrics

/** Save all the files in given local checkpoint directory as a committed version in DFS */
def saveCheckpointToDfs(checkpointDir: File, version: Long, numKeys: Long): Unit = {
logFilesInDir(checkpointDir, s"Saving checkpoint files for version $version")
Expand Down Expand Up @@ -336,6 +353,11 @@ class RocksDBFileManager(
s" DFS for version $version. $filesReused files reused without copying.")
versionToRocksDBFiles.put(version, immutableFiles)

saveCheckpointMetrics = RocksDBFileManagerMetrics(
bytesCopied = bytesCopied,
filesCopied = filesCopied,
filesReused = filesReused)

immutableFiles
}

Expand Down Expand Up @@ -387,6 +409,11 @@ class RocksDBFileManager(
}
logInfo(s"Copied $filesCopied files ($bytesCopied bytes) from DFS to local with " +
s"$filesReused files reused.")

loadCheckpointMetrics = RocksDBFileManagerMetrics(
bytesCopied = bytesCopied,
filesCopied = filesCopied,
filesReused = filesReused)
}

/** Get the SST files required for a version from the version zip file in DFS */
Expand Down Expand Up @@ -420,6 +447,9 @@ class RocksDBFileManager(
}
zout.close() // so that any error in closing also cancels the output stream
logInfo(s"Zipped $totalBytes bytes (before compression) to $filesStr")
// The other fields saveCheckpointMetrics should have been filled
saveCheckpointMetrics =
saveCheckpointMetrics.copy(zipFileBytesUncompressed = Some(totalBytes))
} catch {
case e: Exception =>
// Cancel the actual output stream first, so that zout.close() does not write the file
Expand Down Expand Up @@ -486,6 +516,23 @@ class RocksDBFileManager(
}
}

/**
* Metrics regarding RocksDB file sync between local and DFS.
*/
case class RocksDBFileManagerMetrics(
filesCopied: Long,
bytesCopied: Long,
filesReused: Long,
@JsonDeserialize(contentAs = classOf[java.lang.Long])
zipFileBytesUncompressed: Option[Long] = None)

/**
* Metrics to return when requested but no operation has been performed.
*/
object RocksDBFileManagerMetrics {
val EMPTY_METRICS = RocksDBFileManagerMetrics(0L, 0L, 0L, None)
}

/**
* Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
* changes to this MUST be backward-compatible.
Expand Down