Skip to content

Commit a580a1d

Browse files
committed
more metrics
1 parent ca6acf0 commit a580a1d

File tree

2 files changed

+115
-1
lines changed

2 files changed

+115
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import scala.ref.WeakReference
2626
import scala.util.Try
2727

2828
import org.apache.hadoop.conf.Configuration
29+
import org.json4s.NoTypeHints
30+
import org.json4s.jackson.Serialization
2931
import org.rocksdb.{RocksDB => NativeRocksDB, _}
3032

3133
import org.apache.spark.TaskContext
@@ -72,6 +74,7 @@ class RocksDB(
7274
dbOptions.setTableFormatConfig(tableFormatConfig)
7375
private val dbLogger = createLogger() // for forwarding RocksDB native logs to log4j
7476
dbOptions.setStatistics(new Statistics())
77+
private val nativeStats = dbOptions.statistics()
7578

7679
private val workingDir = createTempDir("workingDir")
7780
private val fileManager = new RocksDBFileManager(
@@ -84,6 +87,8 @@ class RocksDB(
8487
@volatile private var loadedVersion = -1L // -1 = nothing valid is loaded
8588
@volatile private var numKeysOnLoadedVersion = 0L
8689
@volatile private var numKeysOnWritingVersion = 0L
90+
@volatile private var fileManagerMetrics = RocksDBFileManagerMetrics.EMPTY_METRICS
91+
@volatile private var acquiredThreadInfo: AcquiredThreadInfo = _
8792

8893
@GuardedBy("acquireLock")
8994
@volatile private var acquiredThreadInfo: AcquiredThreadInfo = _
@@ -105,6 +110,7 @@ class RocksDB(
105110
numKeysOnWritingVersion = metadata.numKeys
106111
numKeysOnLoadedVersion = metadata.numKeys
107112
loadedVersion = version
113+
fileManagerMetrics = fileManager.latestloadCheckpointMetrics
108114
}
109115
writeBatch.clear()
110116
logInfo(s"Loaded $version")
@@ -223,6 +229,7 @@ class RocksDB(
223229
}
224230
numKeysOnLoadedVersion = numKeysOnWritingVersion
225231
loadedVersion = newVersion
232+
fileManagerMetrics = fileManager.latestSaveCheckpointMetrics
226233
commitLatencyMs ++= Map(
227234
"writeBatch" -> writeTimeMs,
228235
"flush" -> flushTimeMs,
@@ -231,6 +238,7 @@ class RocksDB(
231238
"checkpoint" -> checkpointTimeMs,
232239
"fileSync" -> fileSyncTimeMs
233240
)
241+
logInfo(s"Committed $newVersion, stats = ${metrics.json}")
234242
loadedVersion
235243
} catch {
236244
case t: Throwable =>
@@ -283,6 +291,30 @@ class RocksDB(
283291
/** Get the latest version available in the DFS */
284292
def getLatestVersion(): Long = fileManager.getLatestVersion()
285293

294+
/** Get current instantaneous statistics */
295+
def metrics: RocksDBMetrics = {
296+
import HistogramType._
297+
val totalSSTFilesBytes = getDBProperty("rocksdb.total-sst-files-size")
298+
val readerMemUsage = getDBProperty("rocksdb.estimate-table-readers-mem")
299+
val memTableMemUsage = getDBProperty("rocksdb.size-all-mem-tables")
300+
val nativeOps = Seq("get" -> DB_GET, "put" -> DB_WRITE).toMap
301+
val nativeOpsLatencyMicros = nativeOps.mapValues { typ =>
302+
RocksDBNativeHistogram(nativeStats.getHistogramData(typ))
303+
}
304+
305+
RocksDBMetrics(
306+
numCommittedKeys,
307+
numUncommittedKeys,
308+
readerMemUsage + memTableMemUsage,
309+
totalSSTFilesBytes,
310+
nativeOpsLatencyMicros,
311+
commitLatencyMs,
312+
bytesCopied = fileManagerMetrics.bytesCopied,
313+
filesCopied = fileManagerMetrics.filesCopied,
314+
filesReused = fileManagerMetrics.filesReused,
315+
zipFileBytesUncompressed = fileManagerMetrics.zipFileBytesUncompressed)
316+
}
317+
286318
private def acquire(): Unit = acquireLock.synchronized {
287319
val newAcquiredThreadInfo = AcquiredThreadInfo()
288320
val waitStartTime = System.currentTimeMillis
@@ -388,7 +420,6 @@ class ByteArrayPair(var key: Array[Byte] = null, var value: Array[Byte] = null)
388420

389421
/**
390422
* Configurations for optimizing RocksDB
391-
*
392423
* @param compactOnCommit Whether to compact RocksDB data before commit / checkpointing
393424
*/
394425
case class RocksDBConf(
@@ -442,6 +473,42 @@ object RocksDBConf {
442473
def apply(): RocksDBConf = apply(new StateStoreConf())
443474
}
444475

476+
/** Class to represent stats from each commit. */
477+
case class RocksDBMetrics(
478+
numCommittedKeys: Long,
479+
numUncommittedKeys: Long,
480+
memUsageBytes: Long,
481+
totalSSTFilesBytes: Long,
482+
nativeOpsLatencyMicros: Map[String, RocksDBNativeHistogram],
483+
lastCommitLatencyMs: Map[String, Long],
484+
filesCopied: Long,
485+
bytesCopied: Long,
486+
filesReused: Long,
487+
zipFileBytesUncompressed: Option[Long]) {
488+
def json: String = Serialization.write(this)(RocksDBMetrics.format)
489+
}
490+
491+
object RocksDBMetrics {
492+
val format = Serialization.formats(NoTypeHints)
493+
}
494+
495+
/** Class to wrap RocksDB's native histogram */
496+
case class RocksDBNativeHistogram(
497+
avg: Double, stddev: Double, median: Double, p95: Double, p99: Double) {
498+
def json: String = Serialization.write(this)(RocksDBMetrics.format)
499+
}
500+
501+
object RocksDBNativeHistogram {
502+
def apply(nativeHist: HistogramData): RocksDBNativeHistogram = {
503+
RocksDBNativeHistogram(
504+
nativeHist.getAverage,
505+
nativeHist.getStandardDeviation,
506+
nativeHist.getMedian,
507+
nativeHist.getPercentile95,
508+
nativeHist.getPercentile99)
509+
}
510+
}
511+
445512
case class AcquiredThreadInfo() {
446513
val threadRef: WeakReference[Thread] = new WeakReference[Thread](Thread.currentThread())
447514
val tc: TaskContext = TaskContext.get()

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import scala.collection.mutable
2929

3030
import com.fasterxml.jackson.annotation.JsonInclude.Include
3131
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
32+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
3233
import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
3334
import org.apache.commons.io.{FilenameUtils, IOUtils}
3435
import org.apache.hadoop.conf.Configuration
@@ -134,6 +135,22 @@ class RocksDBFileManager(
134135
override def accept(path: Path): Boolean = path.toString.endsWith(".zip")
135136
}
136137

138+
/**
139+
* Metrics for loading checkpoint from DFS. Every loadCheckpointFromDFS call will update this
140+
* metrics, so this effectively records the latest metrics.
141+
*/
142+
@volatile private var loadCheckpointMetrics = RocksDBFileManagerMetrics.EMPTY_METRICS
143+
144+
/**
145+
* Metrics for saving checkpoint to DFS. Every saveCheckpointToDFS call will update this
146+
* metrics, so this effectively records the latest metrics.
147+
*/
148+
@volatile private var saveCheckpointMetrics = RocksDBFileManagerMetrics.EMPTY_METRICS
149+
150+
def latestloadCheckpointMetrics: RocksDBFileManagerMetrics = loadCheckpointMetrics
151+
152+
def latestSaveCheckpointMetrics: RocksDBFileManagerMetrics = saveCheckpointMetrics
153+
137154
/** Save all the files in given local checkpoint directory as a committed version in DFS */
138155
def saveCheckpointToDfs(checkpointDir: File, version: Long, numKeys: Long): Unit = {
139156
logFilesInDir(checkpointDir, s"Saving checkpoint files for version $version")
@@ -336,6 +353,11 @@ class RocksDBFileManager(
336353
s" DFS for version $version. $filesReused files reused without copying.")
337354
versionToRocksDBFiles.put(version, immutableFiles)
338355

356+
saveCheckpointMetrics = RocksDBFileManagerMetrics(
357+
bytesCopied = bytesCopied,
358+
filesCopied = filesCopied,
359+
filesReused = filesReused)
360+
339361
immutableFiles
340362
}
341363

@@ -387,6 +409,11 @@ class RocksDBFileManager(
387409
}
388410
logInfo(s"Copied $filesCopied files ($bytesCopied bytes) from DFS to local with " +
389411
s"$filesReused files reused.")
412+
413+
loadCheckpointMetrics = RocksDBFileManagerMetrics(
414+
bytesCopied = bytesCopied,
415+
filesCopied = filesCopied,
416+
filesReused = filesReused)
390417
}
391418

392419
/** Get the SST files required for a version from the version zip file in DFS */
@@ -420,6 +447,9 @@ class RocksDBFileManager(
420447
}
421448
zout.close() // so that any error in closing also cancels the output stream
422449
logInfo(s"Zipped $totalBytes bytes (before compression) to $filesStr")
450+
// The other fields saveCheckpointMetrics should have been filled
451+
saveCheckpointMetrics =
452+
saveCheckpointMetrics.copy(zipFileBytesUncompressed = Some(totalBytes))
423453
} catch {
424454
case e: Exception =>
425455
// Cancel the actual output stream first, so that zout.close() does not write the file
@@ -486,6 +516,23 @@ class RocksDBFileManager(
486516
}
487517
}
488518

519+
/**
520+
* Metrics regarding RocksDB file sync between local and DFS.
521+
*/
522+
case class RocksDBFileManagerMetrics(
523+
filesCopied: Long,
524+
bytesCopied: Long,
525+
filesReused: Long,
526+
@JsonDeserialize(contentAs = classOf[java.lang.Long])
527+
zipFileBytesUncompressed: Option[Long] = None)
528+
529+
/**
530+
* Metrics to return when requested but no operation has been performed.
531+
*/
532+
object RocksDBFileManagerMetrics {
533+
val EMPTY_METRICS = RocksDBFileManagerMetrics(0L, 0L, 0L, None)
534+
}
535+
489536
/**
490537
* Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
491538
* changes to this MUST be backward-compatible.

0 commit comments

Comments
 (0)