From b6202545f206aaf9ef4b30891209f095dea73059 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Thu, 31 May 2018 23:38:00 +0900 Subject: [PATCH 1/9] [SPARK-24441][SS] Expose total size of states in HDFSBackedStateStoreProvider * NOTE: This commit is squashed version of commits which are already reviewed by others * expose estimation of size of cache (loadMaps) in HDFSBackedStateStoreProvider * as a custom metric of StateStore * lowercase description of new metric: existing metrics are all lowercase * reduce indentation level via starting with synchronized from the start of func. definition * expose another custom metric: (average) count of versions stored in loadedMaps * Also expose custom metrics in state store to StreamingQueryStatus as well * average metric is implemented a bit tricky way, so just exclude average metrics for now * rename metricProviderLoaderMapSize to metricProviderLoaderMapSizeBytes * Fix broken tests * scala Map doesn't look like working well with json4s: followed up the existing approach to resolve * Add more assertion --- .../sql/execution/metric/SQLMetrics.scala | 8 +- .../state/HDFSBackedStateStoreProvider.scala | 18 ++++- .../streaming/state/StateStore.scala | 2 + .../state/SymmetricHashJoinStateManager.scala | 2 + .../streaming/statefulOperators.scala | 21 +++++- .../apache/spark/sql/streaming/progress.scala | 23 ++++-- .../streaming/state/StateStoreSuite.scala | 75 +++++++++++++++++++ .../StreamingQueryListenerSuite.scala | 2 +- ...StreamingQueryStatusAndProgressSuite.scala | 10 ++- 9 files changed, 144 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index b4f0ae1eb1a1..116d9258d836 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -81,10 +81,10 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato } object SQLMetrics { - private val SUM_METRIC = "sum" - private val SIZE_METRIC = "size" - private val TIMING_METRIC = "timing" - private val AVERAGE_METRIC = "average" + val SUM_METRIC = "sum" + val SIZE_METRIC = "size" + val TIMING_METRIC = "timing" + val AVERAGE_METRIC = "average" private val baseForAvgMetric: Int = 10 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 118c82aa75e6..a949c4ea9c6e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -164,7 +164,8 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit } override def metrics: StateStoreMetrics = { - StateStoreMetrics(mapToUpdate.size(), SizeEstimator.estimate(mapToUpdate), Map.empty) + StateStoreMetrics(mapToUpdate.size(), SizeEstimator.estimate(mapToUpdate), + getCustomMetricsForProvider()) } /** @@ -179,6 +180,11 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit } } + def getCustomMetricsForProvider(): Map[StateStoreCustomMetric, Long] = synchronized { + Map(metricProviderLoaderMapSizeBytes -> SizeEstimator.estimate(loadedMaps), + metricProviderLoaderCountOfVersionsInMap -> loadedMaps.size) + } + /** Get the state store for making updates to create a new `version` of the store. */ override def getStore(version: Long): StateStore = synchronized { require(version >= 0, "Version cannot be less than 0") @@ -224,7 +230,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit } override def supportedCustomMetrics: Seq[StateStoreCustomMetric] = { - Nil + metricProviderLoaderMapSizeBytes :: metricProviderLoaderCountOfVersionsInMap :: Nil } override def toString(): String = { @@ -245,6 +251,14 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit private lazy val fm = CheckpointFileManager.create(baseDir, hadoopConf) private lazy val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf) + private lazy val metricProviderLoaderMapSizeBytes: StateStoreCustomSizeMetric = + StateStoreCustomSizeMetric("providerLoadedMapSizeBytes", + "estimated size of states cache in provider") + + private lazy val metricProviderLoaderCountOfVersionsInMap: StateStoreCustomAverageMetric = + StateStoreCustomAverageMetric("providerLoadedMapCountOfVersions", + "count of versions in states cache in provider") + private case class StoreFile(version: Long, path: Path, isSnapshot: Boolean) private def commitUpdates(newVersion: Long, map: MapType, output: DataOutputStream): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 7eb68c21569b..b940f3f0f7c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -138,6 +138,8 @@ trait StateStoreCustomMetric { def name: String def desc: String } + +case class StateStoreCustomAverageMetric(name: String, desc: String) extends StateStoreCustomMetric case class StateStoreCustomSizeMetric(name: String, desc: String) extends StateStoreCustomMetric case class StateStoreCustomTimingMetric(name: String, desc: String) extends StateStoreCustomMetric diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala index 6b386308c79f..74a3dd63489e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala @@ -269,6 +269,8 @@ class SymmetricHashJoinStateManager( keyWithIndexToValueMetrics.numKeys, // represent each buffered row only once keyToNumValuesMetrics.memoryUsedBytes + keyWithIndexToValueMetrics.memoryUsedBytes, keyWithIndexToValueMetrics.customMetrics.map { + case (s @ StateStoreCustomAverageMetric(_, desc), value) => + s.copy(desc = newDesc(desc)) -> value case (s @ StateStoreCustomSizeMetric(_, desc), value) => s.copy(desc = newDesc(desc)) -> value case (s @ StateStoreCustomTimingMetric(_, desc), value) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 6759fb42b405..82f62022b864 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -90,10 +90,20 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => * the driver after this SparkPlan has been executed and metrics have been updated. */ def getProgress(): StateOperatorProgress = { + // average metric is a bit tricky, so hard to aggregate: just exclude them to simplify issue + val avgExcludedCustomMetrics = stateStoreCustomMetrics + .filterNot(_._2.metricType == SQLMetrics.AVERAGE_METRIC) + .map(entry => entry._1 -> longMetric(entry._1).value) + + val javaConvertedCustomMetrics: java.util.HashMap[String, java.lang.Long] = + new java.util.HashMap(avgExcludedCustomMetrics.mapValues(long2Long).asJava) + new StateOperatorProgress( numRowsTotal = longMetric("numTotalStateRows").value, numRowsUpdated = longMetric("numUpdatedStateRows").value, - memoryUsedBytes = longMetric("stateMemory").value) + memoryUsedBytes = longMetric("stateMemory").value, + javaConvertedCustomMetrics + ) } /** Records the duration of running `body` for the next query progress update. */ @@ -107,14 +117,19 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => val storeMetrics = store.metrics longMetric("numTotalStateRows") += storeMetrics.numKeys longMetric("stateMemory") += storeMetrics.memoryUsedBytes - storeMetrics.customMetrics.foreach { case (metric, value) => - longMetric(metric.name) += value + storeMetrics.customMetrics.foreach { + case (metric: StateStoreCustomAverageMetric, value) => + longMetric(metric.name).set(value * 1.0d) + + case (metric, value) => longMetric(metric.name) += value } } private def stateStoreCustomMetrics: Map[String, SQLMetric] = { val provider = StateStoreProvider.create(sqlContext.conf.stateStoreProviderClass) provider.supportedCustomMetrics.map { + case StateStoreCustomAverageMetric(name, desc) => + name -> SQLMetrics.createAverageMetric(sparkContext, desc) case StateStoreCustomSizeMetric(name, desc) => name -> SQLMetrics.createSizeMetric(sparkContext, desc) case StateStoreCustomTimingMetric(name, desc) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 0dcb666e2c3e..f13995c74b41 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -38,7 +38,8 @@ import org.apache.spark.annotation.InterfaceStability class StateOperatorProgress private[sql]( val numRowsTotal: Long, val numRowsUpdated: Long, - val memoryUsedBytes: Long + val memoryUsedBytes: Long, + val customMetrics: ju.Map[String, JLong] = new ju.HashMap() ) extends Serializable { /** The compact JSON representation of this progress. */ @@ -48,12 +49,24 @@ class StateOperatorProgress private[sql]( def prettyJson: String = pretty(render(jsonValue)) private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress = - new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes) + new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes, customMetrics) private[sql] def jsonValue: JValue = { - ("numRowsTotal" -> JInt(numRowsTotal)) ~ - ("numRowsUpdated" -> JInt(numRowsUpdated)) ~ - ("memoryUsedBytes" -> JInt(memoryUsedBytes)) + def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => JValue): JValue = { + if (map.isEmpty) return JNothing + val keys = map.keySet.asScala.toSeq.sorted + keys.map { k => k -> valueToJValue(map.get(k)) : JObject }.reduce(_ ~ _) + } + + val jsonVal = ("numRowsTotal" -> JInt(numRowsTotal)) ~ + ("numRowsUpdated" -> JInt(numRowsUpdated)) ~ + ("memoryUsedBytes" -> JInt(memoryUsedBytes)) + + if (!customMetrics.isEmpty) { + jsonVal ~ ("customMetrics" -> safeMapToJValue[JLong](customMetrics, v => JInt(v.toLong))) + } else { + jsonVal + } } override def toString: String = prettyJson diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index 73f870506040..8a9ef73d8dd5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -507,6 +507,81 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] assert(CreateAtomicTestManager.cancelCalledInCreateAtomic) } + test("expose metrics with custom metrics to StateStoreMetrics") { + val provider = newStoreProvider() + + // Verify state before starting a new set of updates + assert(getLatestData(provider).isEmpty) + + val store = provider.getStore(0) + assert(!store.hasCommitted) + assert(store.metrics.numKeys === 0) + + assert(store.metrics.customMetrics.exists(_._1.name == "providerLoadedMapCountOfVersions")) + var loadedMapSize = store.metrics.customMetrics.find(_._1.name == "providerLoadedMapSizeBytes") + assert(loadedMapSize.isDefined) + val initialLoadedMapSize = loadedMapSize.get._2 + assert(initialLoadedMapSize >= 0) + + put(store, "a", 1) + assert(store.metrics.numKeys === 1) + + put(store, "b", 2) + put(store, "aa", 3) + assert(store.metrics.numKeys === 3) + remove(store, _.startsWith("a")) + assert(store.metrics.numKeys === 1) + assert(store.commit() === 1) + + assert(store.hasCommitted) + + assert(store.metrics.customMetrics.exists(_._1.name == "providerLoadedMapCountOfVersions")) + loadedMapSize = store.metrics.customMetrics.find(_._1.name == "providerLoadedMapSizeBytes") + assert(loadedMapSize.isDefined) + val loadedMapSizeForVersion1 = loadedMapSize.get._2 + assert(loadedMapSizeForVersion1 > initialLoadedMapSize) + + val storeV2 = provider.getStore(1) + assert(!storeV2.hasCommitted) + assert(storeV2.metrics.numKeys === 1) + + put(storeV2, "cc", 4) + assert(storeV2.metrics.numKeys === 2) + assert(storeV2.commit() === 2) + + assert(storeV2.hasCommitted) + + assert(storeV2.metrics.customMetrics.exists(_._1.name == "providerLoadedMapCountOfVersions")) + loadedMapSize = storeV2.metrics.customMetrics.find(_._1.name == "providerLoadedMapSizeBytes") + assert(loadedMapSize.isDefined) + val loadedMapSizeForVersion1And2 = loadedMapSize.get._2 + assert(loadedMapSizeForVersion1And2 > loadedMapSizeForVersion1) + + val reloadedProvider = newStoreProvider(store.id) + // intended to load version 2 instead of 1 + // version 2 will not be loaded to the cache in provider + val reloadedStore = reloadedProvider.getStore(1) + assert(reloadedStore.metrics.numKeys === 1) + + assert(reloadedStore.metrics.customMetrics + .exists(_._1.name == "providerLoadedMapCountOfVersions")) + loadedMapSize = reloadedStore.metrics.customMetrics + .find(_._1.name == "providerLoadedMapSizeBytes") + assert(loadedMapSize.isDefined) + assert(loadedMapSize.get._2 === loadedMapSizeForVersion1) + + // now we are loading version 2 + val reloadedStoreV2 = reloadedProvider.getStore(2) + assert(reloadedStoreV2.metrics.numKeys === 2) + + assert(reloadedStoreV2.metrics.customMetrics + .exists(_._1.name == "providerLoadedMapCountOfVersions")) + loadedMapSize = reloadedStoreV2.metrics.customMetrics + .find(_._1.name == "providerLoadedMapSizeBytes") + assert(loadedMapSize.isDefined) + assert(loadedMapSize.get._2 > loadedMapSizeForVersion1) + } + override def newStoreProvider(): HDFSBackedStateStoreProvider = { newStoreProvider(opId = Random.nextInt(), partition = 0) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index b96f2bcbdd64..0f15cd6e5a50 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -231,7 +231,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { test("event ordering") { val listener = new EventCollector withListenerAdded(listener) { - for (i <- 1 to 100) { + for (i <- 1 to 50) { listener.reset() require(listener.startEvent === null) testStream(MemoryStream[Int].toDS)( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index 79bb827e0de9..403b50a949b9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -58,7 +58,10 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { | "stateOperators" : [ { | "numRowsTotal" : 0, | "numRowsUpdated" : 1, - | "memoryUsedBytes" : 2 + | "memoryUsedBytes" : 2, + | "customMetrics" : { + | "providerLoadedMapSizeBytes" : 3 + | } | } ], | "sources" : [ { | "description" : "source", @@ -230,7 +233,10 @@ object StreamingQueryStatusAndProgressSuite { "avg" -> "2016-12-05T20:54:20.827Z", "watermark" -> "2016-12-05T20:54:20.827Z").asJava), stateOperators = Array(new StateOperatorProgress( - numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2)), + numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2, + customMetrics = new java.util.HashMap(Map("providerLoadedMapSizeBytes" -> 3L) + .mapValues(long2Long).asJava) + )), sources = Array( new SourceProgress( description = "source", From c9aada520889b87ace0886805910f0d56d099bd2 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Tue, 3 Jul 2018 06:06:05 +0900 Subject: [PATCH 2/9] Add two more metrics: state cache hit / miss count in HDFS state provider --- .../state/HDFSBackedStateStoreProvider.scala | 22 ++++++++++-- .../streaming/state/StateStore.scala | 1 + .../state/SymmetricHashJoinStateManager.scala | 2 ++ .../streaming/statefulOperators.scala | 2 ++ .../streaming/state/StateStoreSuite.scala | 34 +++++++++++++++++++ ...StreamingQueryStatusAndProgressSuite.scala | 5 ++- 6 files changed, 63 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index a949c4ea9c6e..71c3e2f83a53 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming.state import java.io._ import java.util.Locale +import java.util.concurrent.atomic.LongAdder import scala.collection.JavaConverters._ import scala.collection.mutable @@ -182,7 +183,9 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit def getCustomMetricsForProvider(): Map[StateStoreCustomMetric, Long] = synchronized { Map(metricProviderLoaderMapSizeBytes -> SizeEstimator.estimate(loadedMaps), - metricProviderLoaderCountOfVersionsInMap -> loadedMaps.size) + metricProviderLoaderCountOfVersionsInMap -> loadedMaps.size, + metricLoadedMapCacheHit -> loadedMapCacheHitCount.sum(), + metricLoadedMapCacheMiss -> loadedMapCacheMissCount.sum()) } /** Get the state store for making updates to create a new `version` of the store. */ @@ -230,7 +233,8 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit } override def supportedCustomMetrics: Seq[StateStoreCustomMetric] = { - metricProviderLoaderMapSizeBytes :: metricProviderLoaderCountOfVersionsInMap :: Nil + metricProviderLoaderMapSizeBytes :: metricProviderLoaderCountOfVersionsInMap :: + metricLoadedMapCacheHit :: metricLoadedMapCacheMiss :: Nil } override def toString(): String = { @@ -251,6 +255,9 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit private lazy val fm = CheckpointFileManager.create(baseDir, hadoopConf) private lazy val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf) + private val loadedMapCacheHitCount: LongAdder = new LongAdder + private val loadedMapCacheMissCount: LongAdder = new LongAdder + private lazy val metricProviderLoaderMapSizeBytes: StateStoreCustomSizeMetric = StateStoreCustomSizeMetric("providerLoadedMapSizeBytes", "estimated size of states cache in provider") @@ -259,6 +266,14 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit StateStoreCustomAverageMetric("providerLoadedMapCountOfVersions", "count of versions in states cache in provider") + private lazy val metricLoadedMapCacheHit: StateStoreCustomMetric = + StateStoreCustomSumMetric("loadedMapCacheHitCount", + "count of cache hit on states cache in provider") + + private lazy val metricLoadedMapCacheMiss: StateStoreCustomMetric = + StateStoreCustomSumMetric("loadedMapCacheMissCount", + "count of cache miss on states cache in provider") + private case class StoreFile(version: Long, path: Path, isSnapshot: Boolean) private def commitUpdates(newVersion: Long, map: MapType, output: DataOutputStream): Unit = { @@ -290,6 +305,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit // Shortcut if the map for this version is already there to avoid a redundant put. val loadedCurrentVersionMap = synchronized { loadedMaps.get(version) } if (loadedCurrentVersionMap.isDefined) { + loadedMapCacheHitCount.increment() return loadedCurrentVersionMap.get } @@ -297,6 +313,8 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit "Reading snapshot file and delta files if needed..." + "Note that this is normal for the first batch of starting query.") + loadedMapCacheMissCount.increment() + val (result, elapsedMs) = Utils.timeTakenMs { val snapshotCurrentVersionMap = readSnapshotFile(version) if (snapshotCurrentVersionMap.isDefined) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index b940f3f0f7c8..48198dc81993 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -139,6 +139,7 @@ trait StateStoreCustomMetric { def desc: String } +case class StateStoreCustomSumMetric(name: String, desc: String) extends StateStoreCustomMetric case class StateStoreCustomAverageMetric(name: String, desc: String) extends StateStoreCustomMetric case class StateStoreCustomSizeMetric(name: String, desc: String) extends StateStoreCustomMetric case class StateStoreCustomTimingMetric(name: String, desc: String) extends StateStoreCustomMetric diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala index 74a3dd63489e..ff9de583ecef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala @@ -269,6 +269,8 @@ class SymmetricHashJoinStateManager( keyWithIndexToValueMetrics.numKeys, // represent each buffered row only once keyToNumValuesMetrics.memoryUsedBytes + keyWithIndexToValueMetrics.memoryUsedBytes, keyWithIndexToValueMetrics.customMetrics.map { + case (s @ StateStoreCustomSumMetric(_, desc), value) => + s.copy(desc = newDesc(desc)) -> value case (s @ StateStoreCustomAverageMetric(_, desc), value) => s.copy(desc = newDesc(desc)) -> value case (s @ StateStoreCustomSizeMetric(_, desc), value) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 82f62022b864..b0c8549ff8bc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -128,6 +128,8 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => private def stateStoreCustomMetrics: Map[String, SQLMetric] = { val provider = StateStoreProvider.create(sqlContext.conf.stateStoreProviderClass) provider.supportedCustomMetrics.map { + case StateStoreCustomSumMetric(name, desc) => + name -> SQLMetrics.createMetric(sparkContext, desc) case StateStoreCustomAverageMetric(name, desc) => name -> SQLMetrics.createAverageMetric(sparkContext, desc) case StateStoreCustomSizeMetric(name, desc) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index 8a9ef73d8dd5..34767bdcee1d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -522,6 +522,12 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] assert(loadedMapSize.isDefined) val initialLoadedMapSize = loadedMapSize.get._2 assert(initialLoadedMapSize >= 0) + var cacheHitCount = store.metrics.customMetrics.find(_._1.name == "loadedMapCacheHitCount") + assert(cacheHitCount.isDefined) + assert(cacheHitCount.get._2 == 0) + var cacheMissCount = store.metrics.customMetrics.find(_._1.name == "loadedMapCacheMissCount") + assert(cacheMissCount.isDefined) + assert(cacheMissCount.get._2 == 0) put(store, "a", 1) assert(store.metrics.numKeys === 1) @@ -540,6 +546,12 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] assert(loadedMapSize.isDefined) val loadedMapSizeForVersion1 = loadedMapSize.get._2 assert(loadedMapSizeForVersion1 > initialLoadedMapSize) + cacheHitCount = store.metrics.customMetrics.find(_._1.name == "loadedMapCacheHitCount") + assert(cacheHitCount.isDefined) + assert(cacheHitCount.get._2 == 0) + cacheMissCount = store.metrics.customMetrics.find(_._1.name == "loadedMapCacheMissCount") + assert(cacheMissCount.isDefined) + assert(cacheMissCount.get._2 == 0) val storeV2 = provider.getStore(1) assert(!storeV2.hasCommitted) @@ -556,6 +568,12 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] assert(loadedMapSize.isDefined) val loadedMapSizeForVersion1And2 = loadedMapSize.get._2 assert(loadedMapSizeForVersion1And2 > loadedMapSizeForVersion1) + cacheHitCount = store.metrics.customMetrics.find(_._1.name == "loadedMapCacheHitCount") + assert(cacheHitCount.isDefined) + assert(cacheHitCount.get._2 == 1) + cacheMissCount = store.metrics.customMetrics.find(_._1.name == "loadedMapCacheMissCount") + assert(cacheMissCount.isDefined) + assert(cacheMissCount.get._2 == 0) val reloadedProvider = newStoreProvider(store.id) // intended to load version 2 instead of 1 @@ -569,6 +587,14 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] .find(_._1.name == "providerLoadedMapSizeBytes") assert(loadedMapSize.isDefined) assert(loadedMapSize.get._2 === loadedMapSizeForVersion1) + cacheHitCount = reloadedStore.metrics.customMetrics + .find(_._1.name == "loadedMapCacheHitCount") + assert(cacheHitCount.isDefined) + assert(cacheHitCount.get._2 == 0) + cacheMissCount = reloadedStore.metrics.customMetrics + .find(_._1.name == "loadedMapCacheMissCount") + assert(cacheMissCount.isDefined) + assert(cacheMissCount.get._2 == 1) // now we are loading version 2 val reloadedStoreV2 = reloadedProvider.getStore(2) @@ -580,6 +606,14 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] .find(_._1.name == "providerLoadedMapSizeBytes") assert(loadedMapSize.isDefined) assert(loadedMapSize.get._2 > loadedMapSizeForVersion1) + cacheHitCount = reloadedStoreV2.metrics.customMetrics + .find(_._1.name == "loadedMapCacheHitCount") + assert(cacheHitCount.isDefined) + assert(cacheHitCount.get._2 == 0) + cacheMissCount = reloadedStoreV2.metrics.customMetrics + .find(_._1.name == "loadedMapCacheMissCount") + assert(cacheMissCount.isDefined) + assert(cacheMissCount.get._2 == 2) } override def newStoreProvider(): HDFSBackedStateStoreProvider = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index 403b50a949b9..869f1a068188 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -60,6 +60,8 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { | "numRowsUpdated" : 1, | "memoryUsedBytes" : 2, | "customMetrics" : { + | "loadedMapCacheHitCount" : 1, + | "loadedMapCacheMissCount" : 0, | "providerLoadedMapSizeBytes" : 3 | } | } ], @@ -234,7 +236,8 @@ object StreamingQueryStatusAndProgressSuite { "watermark" -> "2016-12-05T20:54:20.827Z").asJava), stateOperators = Array(new StateOperatorProgress( numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2, - customMetrics = new java.util.HashMap(Map("providerLoadedMapSizeBytes" -> 3L) + customMetrics = new java.util.HashMap(Map("providerLoadedMapSizeBytes" -> 3L, + "loadedMapCacheHitCount" -> 1L, "loadedMapCacheMissCount" -> 0L) .mapValues(long2Long).asJava) )), sources = Array( From c4a6d11e230759a0d068024cba04c31a0e7903a9 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Wed, 11 Jul 2018 13:22:08 +0900 Subject: [PATCH 3/9] Get rid of metricProviderLoaderCountOfVersionsInMap and also StateStoreCustomAverageMetric --- .../streaming/state/HDFSBackedStateStoreProvider.scala | 8 +------- .../spark/sql/execution/streaming/state/StateStore.scala | 1 - .../streaming/state/SymmetricHashJoinStateManager.scala | 2 -- .../spark/sql/execution/streaming/statefulOperators.scala | 5 ----- .../sql/execution/streaming/state/StateStoreSuite.scala | 7 ------- 5 files changed, 1 insertion(+), 22 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 71c3e2f83a53..366bb957a8a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -183,7 +183,6 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit def getCustomMetricsForProvider(): Map[StateStoreCustomMetric, Long] = synchronized { Map(metricProviderLoaderMapSizeBytes -> SizeEstimator.estimate(loadedMaps), - metricProviderLoaderCountOfVersionsInMap -> loadedMaps.size, metricLoadedMapCacheHit -> loadedMapCacheHitCount.sum(), metricLoadedMapCacheMiss -> loadedMapCacheMissCount.sum()) } @@ -233,8 +232,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit } override def supportedCustomMetrics: Seq[StateStoreCustomMetric] = { - metricProviderLoaderMapSizeBytes :: metricProviderLoaderCountOfVersionsInMap :: - metricLoadedMapCacheHit :: metricLoadedMapCacheMiss :: Nil + metricProviderLoaderMapSizeBytes :: metricLoadedMapCacheHit :: metricLoadedMapCacheMiss :: Nil } override def toString(): String = { @@ -262,10 +260,6 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit StateStoreCustomSizeMetric("providerLoadedMapSizeBytes", "estimated size of states cache in provider") - private lazy val metricProviderLoaderCountOfVersionsInMap: StateStoreCustomAverageMetric = - StateStoreCustomAverageMetric("providerLoadedMapCountOfVersions", - "count of versions in states cache in provider") - private lazy val metricLoadedMapCacheHit: StateStoreCustomMetric = StateStoreCustomSumMetric("loadedMapCacheHitCount", "count of cache hit on states cache in provider") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 48198dc81993..d3313b8a315c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -140,7 +140,6 @@ trait StateStoreCustomMetric { } case class StateStoreCustomSumMetric(name: String, desc: String) extends StateStoreCustomMetric -case class StateStoreCustomAverageMetric(name: String, desc: String) extends StateStoreCustomMetric case class StateStoreCustomSizeMetric(name: String, desc: String) extends StateStoreCustomMetric case class StateStoreCustomTimingMetric(name: String, desc: String) extends StateStoreCustomMetric diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala index ff9de583ecef..de0cbe81844b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala @@ -271,8 +271,6 @@ class SymmetricHashJoinStateManager( keyWithIndexToValueMetrics.customMetrics.map { case (s @ StateStoreCustomSumMetric(_, desc), value) => s.copy(desc = newDesc(desc)) -> value - case (s @ StateStoreCustomAverageMetric(_, desc), value) => - s.copy(desc = newDesc(desc)) -> value case (s @ StateStoreCustomSizeMetric(_, desc), value) => s.copy(desc = newDesc(desc)) -> value case (s @ StateStoreCustomTimingMetric(_, desc), value) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index b0c8549ff8bc..5174ea94d99b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -118,9 +118,6 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => longMetric("numTotalStateRows") += storeMetrics.numKeys longMetric("stateMemory") += storeMetrics.memoryUsedBytes storeMetrics.customMetrics.foreach { - case (metric: StateStoreCustomAverageMetric, value) => - longMetric(metric.name).set(value * 1.0d) - case (metric, value) => longMetric(metric.name) += value } } @@ -130,8 +127,6 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => provider.supportedCustomMetrics.map { case StateStoreCustomSumMetric(name, desc) => name -> SQLMetrics.createMetric(sparkContext, desc) - case StateStoreCustomAverageMetric(name, desc) => - name -> SQLMetrics.createAverageMetric(sparkContext, desc) case StateStoreCustomSizeMetric(name, desc) => name -> SQLMetrics.createSizeMetric(sparkContext, desc) case StateStoreCustomTimingMetric(name, desc) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index 34767bdcee1d..69a601476f08 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -517,7 +517,6 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] assert(!store.hasCommitted) assert(store.metrics.numKeys === 0) - assert(store.metrics.customMetrics.exists(_._1.name == "providerLoadedMapCountOfVersions")) var loadedMapSize = store.metrics.customMetrics.find(_._1.name == "providerLoadedMapSizeBytes") assert(loadedMapSize.isDefined) val initialLoadedMapSize = loadedMapSize.get._2 @@ -541,7 +540,6 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] assert(store.hasCommitted) - assert(store.metrics.customMetrics.exists(_._1.name == "providerLoadedMapCountOfVersions")) loadedMapSize = store.metrics.customMetrics.find(_._1.name == "providerLoadedMapSizeBytes") assert(loadedMapSize.isDefined) val loadedMapSizeForVersion1 = loadedMapSize.get._2 @@ -563,7 +561,6 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] assert(storeV2.hasCommitted) - assert(storeV2.metrics.customMetrics.exists(_._1.name == "providerLoadedMapCountOfVersions")) loadedMapSize = storeV2.metrics.customMetrics.find(_._1.name == "providerLoadedMapSizeBytes") assert(loadedMapSize.isDefined) val loadedMapSizeForVersion1And2 = loadedMapSize.get._2 @@ -581,8 +578,6 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] val reloadedStore = reloadedProvider.getStore(1) assert(reloadedStore.metrics.numKeys === 1) - assert(reloadedStore.metrics.customMetrics - .exists(_._1.name == "providerLoadedMapCountOfVersions")) loadedMapSize = reloadedStore.metrics.customMetrics .find(_._1.name == "providerLoadedMapSizeBytes") assert(loadedMapSize.isDefined) @@ -600,8 +595,6 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] val reloadedStoreV2 = reloadedProvider.getStore(2) assert(reloadedStoreV2.metrics.numKeys === 2) - assert(reloadedStoreV2.metrics.customMetrics - .exists(_._1.name == "providerLoadedMapCountOfVersions")) loadedMapSize = reloadedStoreV2.metrics.customMetrics .find(_._1.name == "providerLoadedMapSizeBytes") assert(loadedMapSize.isDefined) From a149ce54d50cfd0f8fcee06dc0f156c739410ad8 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Wed, 18 Jul 2018 17:01:57 +0900 Subject: [PATCH 4/9] Fix line which broke Spark Scala Style guide --- .../spark/sql/execution/streaming/statefulOperators.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 5174ea94d99b..bd11ffbdedf8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -117,8 +117,8 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => val storeMetrics = store.metrics longMetric("numTotalStateRows") += storeMetrics.numKeys longMetric("stateMemory") += storeMetrics.memoryUsedBytes - storeMetrics.customMetrics.foreach { - case (metric, value) => longMetric(metric.name) += value + storeMetrics.customMetrics.foreach { case (metric, value) => + longMetric(metric.name) += value } } From 36e5a12fa921ab615592cec8aa7745816d7312b7 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Wed, 18 Jul 2018 17:38:13 +0900 Subject: [PATCH 5/9] Refine test code a bit to remove duplicated code --- .../streaming/state/StateStoreSuite.scala | 88 ++++++++----------- 1 file changed, 37 insertions(+), 51 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index 69a601476f08..a12afee9c566 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -508,6 +508,32 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] } test("expose metrics with custom metrics to StateStoreMetrics") { + def getCustomMetric(metrics: StateStoreMetrics, name: String): Long = { + val metricPair = metrics.customMetrics.find(_._1.name == name) + assert(metricPair.isDefined) + metricPair.get._2 + } + + def getLoadedMapSizeMetric(metrics: StateStoreMetrics): Long = { + getCustomMetric(metrics, "providerLoadedMapSizeBytes") + } + + def getCacheHitCountMetric(metrics: StateStoreMetrics): Long = { + getCustomMetric(metrics, "loadedMapCacheHitCount") + } + + def getCacheMissCountMetric(metrics: StateStoreMetrics): Long = { + getCustomMetric(metrics, "loadedMapCacheMissCount") + } + + def assertCacheHitAndMissMetrics( + metrics: StateStoreMetrics, + expectedCacheHitCount: Long, + expectedCacheMissCount: Long): Unit = { + assert(getCacheHitCountMetric(metrics) === expectedCacheHitCount) + assert(getCacheMissCountMetric(metrics) === expectedCacheMissCount) + } + val provider = newStoreProvider() // Verify state before starting a new set of updates @@ -515,18 +541,12 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] val store = provider.getStore(0) assert(!store.hasCommitted) + assert(store.metrics.numKeys === 0) - var loadedMapSize = store.metrics.customMetrics.find(_._1.name == "providerLoadedMapSizeBytes") - assert(loadedMapSize.isDefined) - val initialLoadedMapSize = loadedMapSize.get._2 + val initialLoadedMapSize = getLoadedMapSizeMetric(store.metrics) assert(initialLoadedMapSize >= 0) - var cacheHitCount = store.metrics.customMetrics.find(_._1.name == "loadedMapCacheHitCount") - assert(cacheHitCount.isDefined) - assert(cacheHitCount.get._2 == 0) - var cacheMissCount = store.metrics.customMetrics.find(_._1.name == "loadedMapCacheMissCount") - assert(cacheMissCount.isDefined) - assert(cacheMissCount.get._2 == 0) + assertCacheHitAndMissMetrics(store.metrics, 0, 0) put(store, "a", 1) assert(store.metrics.numKeys === 1) @@ -540,16 +560,9 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] assert(store.hasCommitted) - loadedMapSize = store.metrics.customMetrics.find(_._1.name == "providerLoadedMapSizeBytes") - assert(loadedMapSize.isDefined) - val loadedMapSizeForVersion1 = loadedMapSize.get._2 + val loadedMapSizeForVersion1 = getLoadedMapSizeMetric(store.metrics) assert(loadedMapSizeForVersion1 > initialLoadedMapSize) - cacheHitCount = store.metrics.customMetrics.find(_._1.name == "loadedMapCacheHitCount") - assert(cacheHitCount.isDefined) - assert(cacheHitCount.get._2 == 0) - cacheMissCount = store.metrics.customMetrics.find(_._1.name == "loadedMapCacheMissCount") - assert(cacheMissCount.isDefined) - assert(cacheMissCount.get._2 == 0) + assertCacheHitAndMissMetrics(store.metrics, 0, 0) val storeV2 = provider.getStore(1) assert(!storeV2.hasCommitted) @@ -561,16 +574,9 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] assert(storeV2.hasCommitted) - loadedMapSize = storeV2.metrics.customMetrics.find(_._1.name == "providerLoadedMapSizeBytes") - assert(loadedMapSize.isDefined) - val loadedMapSizeForVersion1And2 = loadedMapSize.get._2 + val loadedMapSizeForVersion1And2 = getLoadedMapSizeMetric(storeV2.metrics) assert(loadedMapSizeForVersion1And2 > loadedMapSizeForVersion1) - cacheHitCount = store.metrics.customMetrics.find(_._1.name == "loadedMapCacheHitCount") - assert(cacheHitCount.isDefined) - assert(cacheHitCount.get._2 == 1) - cacheMissCount = store.metrics.customMetrics.find(_._1.name == "loadedMapCacheMissCount") - assert(cacheMissCount.isDefined) - assert(cacheMissCount.get._2 == 0) + assertCacheHitAndMissMetrics(storeV2.metrics, 1, 0) val reloadedProvider = newStoreProvider(store.id) // intended to load version 2 instead of 1 @@ -578,35 +584,15 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] val reloadedStore = reloadedProvider.getStore(1) assert(reloadedStore.metrics.numKeys === 1) - loadedMapSize = reloadedStore.metrics.customMetrics - .find(_._1.name == "providerLoadedMapSizeBytes") - assert(loadedMapSize.isDefined) - assert(loadedMapSize.get._2 === loadedMapSizeForVersion1) - cacheHitCount = reloadedStore.metrics.customMetrics - .find(_._1.name == "loadedMapCacheHitCount") - assert(cacheHitCount.isDefined) - assert(cacheHitCount.get._2 == 0) - cacheMissCount = reloadedStore.metrics.customMetrics - .find(_._1.name == "loadedMapCacheMissCount") - assert(cacheMissCount.isDefined) - assert(cacheMissCount.get._2 == 1) + assert(getLoadedMapSizeMetric(reloadedStore.metrics) === loadedMapSizeForVersion1) + assertCacheHitAndMissMetrics(reloadedStore.metrics, 0, 1) // now we are loading version 2 val reloadedStoreV2 = reloadedProvider.getStore(2) assert(reloadedStoreV2.metrics.numKeys === 2) - loadedMapSize = reloadedStoreV2.metrics.customMetrics - .find(_._1.name == "providerLoadedMapSizeBytes") - assert(loadedMapSize.isDefined) - assert(loadedMapSize.get._2 > loadedMapSizeForVersion1) - cacheHitCount = reloadedStoreV2.metrics.customMetrics - .find(_._1.name == "loadedMapCacheHitCount") - assert(cacheHitCount.isDefined) - assert(cacheHitCount.get._2 == 0) - cacheMissCount = reloadedStoreV2.metrics.customMetrics - .find(_._1.name == "loadedMapCacheMissCount") - assert(cacheMissCount.isDefined) - assert(cacheMissCount.get._2 == 2) + assert(getLoadedMapSizeMetric(reloadedStoreV2.metrics) > loadedMapSizeForVersion1) + assertCacheHitAndMissMetrics(reloadedStoreV2.metrics, 0, 2) } override def newStoreProvider(): HDFSBackedStateStoreProvider = { From 5b203d4967eda3a09f7c8d83cf86e7ac6a427182 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Wed, 18 Jul 2018 17:40:37 +0900 Subject: [PATCH 6/9] Refine a bit more --- .../streaming/state/StateStoreSuite.scala | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index a12afee9c566..52f47935c2a7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -518,20 +518,14 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] getCustomMetric(metrics, "providerLoadedMapSizeBytes") } - def getCacheHitCountMetric(metrics: StateStoreMetrics): Long = { - getCustomMetric(metrics, "loadedMapCacheHitCount") - } - - def getCacheMissCountMetric(metrics: StateStoreMetrics): Long = { - getCustomMetric(metrics, "loadedMapCacheMissCount") - } - def assertCacheHitAndMissMetrics( metrics: StateStoreMetrics, expectedCacheHitCount: Long, expectedCacheMissCount: Long): Unit = { - assert(getCacheHitCountMetric(metrics) === expectedCacheHitCount) - assert(getCacheMissCountMetric(metrics) === expectedCacheMissCount) + val cacheHitCount = getCustomMetric(metrics, "loadedMapCacheHitCount") + val cacheMissCount = getCustomMetric(metrics, "loadedMapCacheMissCount") + assert(cacheHitCount === expectedCacheHitCount) + assert(cacheMissCount === expectedCacheMissCount) } val provider = newStoreProvider() From 32d041878b7dcd20794250853063dab4bac09118 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Thu, 19 Jul 2018 10:43:05 +0900 Subject: [PATCH 7/9] Refine a little more --- .../streaming/state/StateStoreSuite.scala | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index 52f47935c2a7..3002ef089aa2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -518,7 +518,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] getCustomMetric(metrics, "providerLoadedMapSizeBytes") } - def assertCacheHitAndMissMetrics( + def assertCacheHitAndMiss( metrics: StateStoreMetrics, expectedCacheHitCount: Long, expectedCacheMissCount: Long): Unit = { @@ -540,7 +540,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] val initialLoadedMapSize = getLoadedMapSizeMetric(store.metrics) assert(initialLoadedMapSize >= 0) - assertCacheHitAndMissMetrics(store.metrics, 0, 0) + assertCacheHitAndMiss(store.metrics, expectedCacheHitCount = 0, expectedCacheMissCount = 0) put(store, "a", 1) assert(store.metrics.numKeys === 1) @@ -556,7 +556,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] val loadedMapSizeForVersion1 = getLoadedMapSizeMetric(store.metrics) assert(loadedMapSizeForVersion1 > initialLoadedMapSize) - assertCacheHitAndMissMetrics(store.metrics, 0, 0) + assertCacheHitAndMiss(store.metrics, expectedCacheHitCount = 0, expectedCacheMissCount = 0) val storeV2 = provider.getStore(1) assert(!storeV2.hasCommitted) @@ -570,7 +570,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] val loadedMapSizeForVersion1And2 = getLoadedMapSizeMetric(storeV2.metrics) assert(loadedMapSizeForVersion1And2 > loadedMapSizeForVersion1) - assertCacheHitAndMissMetrics(storeV2.metrics, 1, 0) + assertCacheHitAndMiss(storeV2.metrics, expectedCacheHitCount = 1, expectedCacheMissCount = 0) val reloadedProvider = newStoreProvider(store.id) // intended to load version 2 instead of 1 @@ -579,14 +579,16 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] assert(reloadedStore.metrics.numKeys === 1) assert(getLoadedMapSizeMetric(reloadedStore.metrics) === loadedMapSizeForVersion1) - assertCacheHitAndMissMetrics(reloadedStore.metrics, 0, 1) + assertCacheHitAndMiss(reloadedStore.metrics, expectedCacheHitCount = 0, + expectedCacheMissCount = 1) // now we are loading version 2 val reloadedStoreV2 = reloadedProvider.getStore(2) assert(reloadedStoreV2.metrics.numKeys === 2) assert(getLoadedMapSizeMetric(reloadedStoreV2.metrics) > loadedMapSizeForVersion1) - assertCacheHitAndMissMetrics(reloadedStoreV2.metrics, 0, 2) + assertCacheHitAndMiss(reloadedStoreV2.metrics, expectedCacheHitCount = 0, + expectedCacheMissCount = 2) } override def newStoreProvider(): HDFSBackedStateStoreProvider = { From ed072fcf057f982275d0daf69787ed812f03e87b Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Wed, 1 Aug 2018 14:23:36 +0900 Subject: [PATCH 8/9] Address review comments from @tdas --- .../sql/execution/metric/SQLMetrics.scala | 8 +++--- .../streaming/statefulOperators.scala | 6 ++--- .../apache/spark/sql/streaming/progress.scala | 26 ++++++++----------- 3 files changed, 17 insertions(+), 23 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index 116d9258d836..b4f0ae1eb1a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -81,10 +81,10 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato } object SQLMetrics { - val SUM_METRIC = "sum" - val SIZE_METRIC = "size" - val TIMING_METRIC = "timing" - val AVERAGE_METRIC = "average" + private val SUM_METRIC = "sum" + private val SIZE_METRIC = "size" + private val TIMING_METRIC = "timing" + private val AVERAGE_METRIC = "average" private val baseForAvgMetric: Int = 10 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index bd11ffbdedf8..2783ca2987eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -90,13 +90,11 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => * the driver after this SparkPlan has been executed and metrics have been updated. */ def getProgress(): StateOperatorProgress = { - // average metric is a bit tricky, so hard to aggregate: just exclude them to simplify issue - val avgExcludedCustomMetrics = stateStoreCustomMetrics - .filterNot(_._2.metricType == SQLMetrics.AVERAGE_METRIC) + val customMetrics = stateStoreCustomMetrics .map(entry => entry._1 -> longMetric(entry._1).value) val javaConvertedCustomMetrics: java.util.HashMap[String, java.lang.Long] = - new java.util.HashMap(avgExcludedCustomMetrics.mapValues(long2Long).asJava) + new java.util.HashMap(customMetrics.mapValues(long2Long).asJava) new StateOperatorProgress( numRowsTotal = longMetric("numTotalStateRows").value, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index f13995c74b41..f2173aa1e59c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -52,21 +52,17 @@ class StateOperatorProgress private[sql]( new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes, customMetrics) private[sql] def jsonValue: JValue = { - def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => JValue): JValue = { - if (map.isEmpty) return JNothing - val keys = map.keySet.asScala.toSeq.sorted - keys.map { k => k -> valueToJValue(map.get(k)) : JObject }.reduce(_ ~ _) - } - - val jsonVal = ("numRowsTotal" -> JInt(numRowsTotal)) ~ - ("numRowsUpdated" -> JInt(numRowsUpdated)) ~ - ("memoryUsedBytes" -> JInt(memoryUsedBytes)) - - if (!customMetrics.isEmpty) { - jsonVal ~ ("customMetrics" -> safeMapToJValue[JLong](customMetrics, v => JInt(v.toLong))) - } else { - jsonVal - } + ("numRowsTotal" -> JInt(numRowsTotal)) ~ + ("numRowsUpdated" -> JInt(numRowsUpdated)) ~ + ("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~ + ("customMetrics" -> { + if (!customMetrics.isEmpty) { + val keys = customMetrics.keySet.asScala.toSeq.sorted + keys.map { k => k -> JInt(customMetrics.get(k).toLong) : JObject }.reduce(_ ~ _) + } else { + JNothing + } + }) } override def toString: String = prettyJson From 545081e4b7a9bb37442f7d558bc28db790742610 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Tue, 21 Aug 2018 19:43:38 +0900 Subject: [PATCH 9/9] Use memoryUsedBytes to total memory usage of loaded versions customMetric.stateOnCurrentVersionSizeBytes to size for memory usage of current version --- .../state/HDFSBackedStateStoreProvider.scala | 29 ++++++++++++------- .../streaming/state/StateStoreSuite.scala | 18 +++++++++++- ...StreamingQueryStatusAndProgressSuite.scala | 8 ++--- 3 files changed, 40 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 366bb957a8a0..071784b08ee8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -165,8 +165,16 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit } override def metrics: StateStoreMetrics = { - StateStoreMetrics(mapToUpdate.size(), SizeEstimator.estimate(mapToUpdate), - getCustomMetricsForProvider()) + // NOTE: we provide estimation of cache size as "memoryUsedBytes", and size of state for + // current version as "stateOnCurrentVersionSizeBytes" + val metricsFromProvider: Map[String, Long] = getMetricsForProvider() + + val customMetrics = metricsFromProvider.flatMap { case (name, value) => + // just allow searching from list cause the list is small enough + supportedCustomMetrics.find(_.name == name).map(_ -> value) + } + (metricStateOnCurrentVersionSizeBytes -> SizeEstimator.estimate(mapToUpdate)) + + StateStoreMetrics(mapToUpdate.size(), metricsFromProvider("memoryUsedBytes"), customMetrics) } /** @@ -181,10 +189,10 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit } } - def getCustomMetricsForProvider(): Map[StateStoreCustomMetric, Long] = synchronized { - Map(metricProviderLoaderMapSizeBytes -> SizeEstimator.estimate(loadedMaps), - metricLoadedMapCacheHit -> loadedMapCacheHitCount.sum(), - metricLoadedMapCacheMiss -> loadedMapCacheMissCount.sum()) + def getMetricsForProvider(): Map[String, Long] = synchronized { + Map("memoryUsedBytes" -> SizeEstimator.estimate(loadedMaps), + metricLoadedMapCacheHit.name -> loadedMapCacheHitCount.sum(), + metricLoadedMapCacheMiss.name -> loadedMapCacheMissCount.sum()) } /** Get the state store for making updates to create a new `version` of the store. */ @@ -232,7 +240,8 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit } override def supportedCustomMetrics: Seq[StateStoreCustomMetric] = { - metricProviderLoaderMapSizeBytes :: metricLoadedMapCacheHit :: metricLoadedMapCacheMiss :: Nil + metricStateOnCurrentVersionSizeBytes :: metricLoadedMapCacheHit :: metricLoadedMapCacheMiss :: + Nil } override def toString(): String = { @@ -256,9 +265,9 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit private val loadedMapCacheHitCount: LongAdder = new LongAdder private val loadedMapCacheMissCount: LongAdder = new LongAdder - private lazy val metricProviderLoaderMapSizeBytes: StateStoreCustomSizeMetric = - StateStoreCustomSizeMetric("providerLoadedMapSizeBytes", - "estimated size of states cache in provider") + private lazy val metricStateOnCurrentVersionSizeBytes: StateStoreCustomSizeMetric = + StateStoreCustomSizeMetric("stateOnCurrentVersionSizeBytes", + "estimated size of state only on current version") private lazy val metricLoadedMapCacheHit: StateStoreCustomMetric = StateStoreCustomSumMetric("loadedMapCacheHitCount", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index 3002ef089aa2..7886b1bd277f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -193,6 +193,22 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] assert(store.metrics.memoryUsedBytes > noDataMemoryUsed) } + test("reports memory usage on current version") { + def getSizeOfStateForCurrentVersion(metrics: StateStoreMetrics): Long = { + val metricPair = metrics.customMetrics.find(_._1.name == "stateOnCurrentVersionSizeBytes") + assert(metricPair.isDefined) + metricPair.get._2 + } + + val provider = newStoreProvider() + val store = provider.getStore(0) + val noDataMemoryUsed = getSizeOfStateForCurrentVersion(store.metrics) + + put(store, "a", 1) + store.commit() + assert(getSizeOfStateForCurrentVersion(store.metrics) > noDataMemoryUsed) + } + test("StateStore.get") { quietly { val dir = newDir() @@ -515,7 +531,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] } def getLoadedMapSizeMetric(metrics: StateStoreMetrics): Long = { - getCustomMetric(metrics, "providerLoadedMapSizeBytes") + metrics.memoryUsedBytes } def assertCacheHitAndMiss( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index 869f1a068188..7bef687e7e43 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -58,11 +58,11 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { | "stateOperators" : [ { | "numRowsTotal" : 0, | "numRowsUpdated" : 1, - | "memoryUsedBytes" : 2, + | "memoryUsedBytes" : 3, | "customMetrics" : { | "loadedMapCacheHitCount" : 1, | "loadedMapCacheMissCount" : 0, - | "providerLoadedMapSizeBytes" : 3 + | "stateOnCurrentVersionSizeBytes" : 2 | } | } ], | "sources" : [ { @@ -235,8 +235,8 @@ object StreamingQueryStatusAndProgressSuite { "avg" -> "2016-12-05T20:54:20.827Z", "watermark" -> "2016-12-05T20:54:20.827Z").asJava), stateOperators = Array(new StateOperatorProgress( - numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2, - customMetrics = new java.util.HashMap(Map("providerLoadedMapSizeBytes" -> 3L, + numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 3, + customMetrics = new java.util.HashMap(Map("stateOnCurrentVersionSizeBytes" -> 2L, "loadedMapCacheHitCount" -> 1L, "loadedMapCacheMissCount" -> 0L) .mapValues(long2Long).asJava) )),