-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-24717][SS] Split out max retain version of state for memory in HDFSBackedStateStoreProvider #21700
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Pasting JIRA issue description to explain why this patch is needed: As default version of "spark.sql.streaming.minBatchesToRetain" is set to high (100), which doesn't require strictly 100x of memory, but I'm seeing 10x ~ 80x of memory consumption for various workloads. In addition, in some cases, requiring 2x of memory is even unacceptable, so we should split out configuration for memory and let users adjust to trade-off between memory usage vs cache miss (building state from files). In normal case, default value '2' would cover both cases: success and restoring failure with less than or around 2x of memory usage, and '1' would only cover success case but no longer require more than 1x of memory. In extreme case, user can set the value to '0' to completely disable the map cache to maximize executor memory usage (covers #21500). |
|
Test build #92546 has finished for PR 21700 at commit
|
|
retest this, please |
|
Test build #92547 has finished for PR 21700 at commit
|
…ackedStateStoreProvider * introduce BoundedSortedMap which implements bounded size of sorted map * only first N elements will be retained * replace loadedMaps to BoundedSortedMap to retain only N versions of states * no need to cleanup in maintenance phase * introduce new configuration: spark.sql.streaming.minBatchesToRetainInMemory
|
Missing new line in EOF for two new Java files. Just addressed. |
|
Test build #92548 has finished for PR 21700 at commit
|
|
Test build #92549 has finished for PR 21700 at commit
|
|
Test build #92550 has finished for PR 21700 at commit
|
|
Test build #92587 has finished for PR 21700 at commit
|
|
Test build #92588 has finished for PR 21700 at commit
|
| } | ||
|
|
||
| @Override | ||
| public void putAll(Map<? extends K, ? extends V> map) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the map parameter be of type SortedMap ?
With ordinary Map, the traversal order is not fixed. It may produce non-deterministic result if the map's size is bigger than this BoundedSortedMap's size
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately this is inherited from Map interface so we can't modify its signature.
And assuming that put is implemented correctly, this can guarantee the size of BoundedSortedMap, since it defers to put method to restrict map's size.
|
|
||
| @Override | ||
| public void putAll(Map<? extends K, ? extends V> map) { | ||
| for (Map.Entry<? extends K, ? extends V> entry : map.entrySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can think of some optimization here:
If the map's size is bigger than or equal to this BoundedSortedMap's size, you can call clear on this sortedMap first if map.lastKey() is lower than this.firstKey - since all of this sortedMap's elements would be evicted.
On the other hand, if map.firstKey() is higher than this.lastKey and this sortedMap is at full capacity, there is no need to enter the loop - no element from map would be taken anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the great suggestion. While we can't assume that map's type is SortedMap, looks like we could check the type of map in runtime and apply your suggestion. Will apply it.
|
|
||
| private lazy val loadedMaps = new mutable.HashMap[Long, MapType] | ||
| // taking default value first: this will be updated by init method with configuration | ||
| @volatile private var numberOfVersionsRetainInMemory: Int = 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
numberOfVersionsRetainInMemory -> numberOfVersionsToRetainInMemory
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix.
|
@tedyu Thanks for the detailed review comments. Addressed. |
|
Test build #92645 has finished for PR 21700 at commit
|
|
Test build #92647 has finished for PR 21700 at commit
|
|
Test build #92649 has finished for PR 21700 at commit
|
|
Test build #92829 has finished for PR 21700 at commit
|
|
Test build #92830 has finished for PR 21700 at commit
|
|
Test build #92832 has finished for PR 21700 at commit
|
| @volatile private var numberOfVersionsToRetainInMemory: Int = _ | ||
|
|
||
| private lazy val loadedMaps = new mutable.HashMap[Long, MapType] | ||
| private lazy val loadedMaps = new util.TreeMap[Long, MapType](Ordering[Long].reverse) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just FYI: Referring java.util.TreeMap is unavoidable cause Scala doesn't support mutable SortedMap unless Scala 2.12+, hence needs additional changes for interop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I was wondering about that. Makes sense.
| @volatile private var numberOfVersionsToRetainInMemory: Int = _ | ||
|
|
||
| private lazy val loadedMaps = new mutable.HashMap[Long, MapType] | ||
| private lazy val loadedMaps = new util.TreeMap[Long, MapType](Ordering[Long].reverse) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I was wondering about that. Makes sense.
| require(!StateStore.isMaintenanceRunning) | ||
| } | ||
|
|
||
| test("retaining only latest configured size of versions in memory") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I didn't catch this earlier. We should ideally have tests that directly validate the specific behaviors we're documenting in the conf:
- '2' will read from cache in the direct failure case
- '1' will read from cache in the happy path but not if there's a failure
- '0' will never read from the cache, and more importantly will maximize memory by never populating it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is fairly easy to check whether reading from cache or reading from file with c9aada5 in #21469 since it introduces metrics for cache hit and cache miss, but not easy to check in this PR itself.
So I just rely on checking cache to ensure the data is correctly evicted and not available in cache as expected. Hope this is OK.
Btw, I caught a silly bug while adding tests to cover your suggestion. Thanks!
…and fix a silly bug
|
|
||
| var currentVersion = 0 | ||
|
|
||
| def restoreOriginValues(map: provider.MapType): Map[String, Int] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've just allowed redundant function definition cause there's no way to use provider.MapType in parameter type unless provider is defined. If we really want to get rid of redundant function definition, we may have to change it to ConcurrentMap directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 one using making it ConcurrentMap. Maybe even better, you can use scala implicit classses to add methods to HDFSBackedStateStoreProvider
implicit class ProviderHelper(provider: StateStoreProvider) {
def toStringIntMap(): Map[String, Int] = { .... }
}
This should avoid this problem. Either way, I hate having these duplicate methods, so we should fix it one way or the other.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On second thought, if you make the convenience method checkVersion i mentioned above, you may not have to do this at all.
|
@jose-torres Addressed review comment. Please take a look again. |
|
Test build #92904 has finished for PR 21700 at commit
|
tdas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good! Some nits to solve, mainly on the test code.
| if (size == numberOfVersionsToRetainInMemory) { | ||
| val versionIdForLastKey = loadedMaps.lastKey() | ||
| if (versionIdForLastKey > newVersion) { | ||
| // this is the only case which put doesn't need |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you clarify when this case can happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will update the comment to clarify a bit more. We just avoid the case when the element is being added to the last and required to be evicted right away.
| loadedMaps.clone().asInstanceOf[util.SortedMap[Long, MapType]] | ||
| } | ||
|
|
||
| private def putStateIntoStateCache(newVersion: Long, map: MapType): Unit = synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
putStateIntoStateCache -> cacheMap, to keep consistent with loadedMaps etc.
| } | ||
|
|
||
| def updateVersionTo(provider: StateStoreProvider, currentVersion: => Int, | ||
| targetVersion: Int): Int = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is incorrect indenting by spark style guide. should be
def updateVersionTo(
provider: StateStoreProvider,
currentVersion: => Int,
targetVersion: Int): Int = {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are you using => Int for currentVersion instead of simply using Int?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, since you are frequently incrementing the version by 1 (i.e. targetVersion = currentVersion + 1, always), you can add another convenience method called incrementVersion(provider, currentVersion)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for correcting style guide. Will fix.
Regarding currentVersion: => Int is somehow I was trying to modify currentVersion itself, and stick with current approach but didn't roll back. Will fix.
And I agree it would be better to have incrementVersion to shorter the code. Will address.
| map.asScala.map(entry => rowToString(entry._1) -> rowToInt(entry._2)).toMap | ||
| } | ||
|
|
||
| var currentVersion = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: please add comments on each section here to make it clear what are you testing
|
|
||
| var currentVersion = 0 | ||
|
|
||
| def restoreOriginValues(map: provider.MapType): Map[String, Int] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 one using making it ConcurrentMap. Maybe even better, you can use scala implicit classses to add methods to HDFSBackedStateStoreProvider
implicit class ProviderHelper(provider: StateStoreProvider) {
def toStringIntMap(): Map[String, Int] = { .... }
}
This should avoid this problem. Either way, I hate having these duplicate methods, so we should fix it one way or the other.
| loadedMaps = provider.getClonedLoadedMaps() | ||
| assert(loadedMaps.size() === 2) | ||
| assert(loadedMaps.firstKey() === 2L) | ||
| assert(loadedMaps.lastKey() === 1L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can make this a convenient function def checkLoadedVersions(num: Int, earliest: Int, latest: Int)
| assert(loadedMaps.firstKey() === 3L) | ||
| assert(loadedMaps.lastKey() === 2L) | ||
| assert(restoreOriginValues(loadedMaps.get(3L)) === Map("a" -> 3)) | ||
| assert(restoreOriginValues(loadedMaps.get(2L)) === Map("a" -> 2)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be boiled down to a convenience method as well to reduce the verbosity def checkVersion(version: Int, expectedData: Map[String, Int])
|
|
||
| var currentVersion = 0 | ||
|
|
||
| def restoreOriginValues(map: provider.MapType): Map[String, Int] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On second thought, if you make the convenience method checkVersion i mentioned above, you may not have to do this at all.
|
@tdas Thanks for the detailed review! Addressed review comments. |
|
Test build #93163 has finished for PR 21700 at commit
|
tdas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes look great! Just a couple of more nits and we are good to go.
| currentVersion = incrementVersion(provider, currentVersion) | ||
| assert(getData(provider) === Set("a" -> 1)) | ||
| var loadedMaps = provider.getClonedLoadedMaps() | ||
| checkLoadedVersions(loadedMaps, 1, 1L, 1L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you make these checkLoadedVersions(loadedMaps, count = 1, min = 1L, max = 1L) so that its obvious while reading what those numbers are.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also does it need the prefix L?? seems like they are everywhere and they really dont need to be.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I'd add 'L' everywhere if the type of literal number should be long so that we don't rely on autocasting and be sure about the type explicitly, but no strong opinion about this. I can follow existing Spark preferences.
| } | ||
|
|
||
| /** This method is intended to be only used for unit test(s). DO NOT TOUCH ELEMENTS IN MAP! */ | ||
| private[state] def getClonedLoadedMaps(): util.SortedMap[Long, MapType] = synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just getLoadedMaps() is fine. The fact that its cloned is just implementation detail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. Will address.
|
@tdas Addressed review comments. Please take a look again. Thanks in advance! |
|
Test build #93256 has finished for PR 21700 at commit
|
|
LGTM! I am merging it! Thank you for all the hard work. And my apologies for not being able to give it time earlier to review it. |
|
My pleasure. Thanks all for spending your time to review thoughtfully, and merge this! |
… HDFSBackedStateStoreProvider This patch proposes breaking down configuration of retaining batch size on state into two pieces: files and in memory (cache). While this patch reuses existing configuration for files, it introduces new configuration, "spark.sql.streaming.maxBatchesToRetainInMemory" to configure max count of batch to retain in memory. Apply this patch on top of SPARK-24441 (apache#21469), and manually tested in various workloads to ensure overall size of states in memory is around 2x or less of the size of latest version of state, while it was 10x ~ 80x before applying the patch. Author: Jungtaek Lim <[email protected]> Closes apache#21700 from HeartSaVioR/SPARK-24717.
#183) [SPARK-24717][SS] Split out max retain version of state for memory in HDFSBackedStateStoreProvider This patch proposes breaking down configuration of retaining batch size on state into two pieces: files and in memory (cache). While this patch reuses existing configuration for files, it introduces new configuration, "spark.sql.streaming.maxBatchesToRetainInMemory" to configure max count of batch to retain in memory. Apply this patch on top of SPARK-24441 (apache#21469), and manually tested in various workloads to ensure overall size of states in memory is around 2x or less of the size of latest version of state, while it was 10x ~ 80x before applying the patch. Author: Jungtaek Lim <[email protected]> Closes apache#21700 from HeartSaVioR/SPARK-24717.
What changes were proposed in this pull request?
This patch proposes breaking down configuration of retaining batch size on state into two pieces: files and in memory (cache). While this patch reuses existing configuration for files, it introduces new configuration, "spark.sql.streaming.maxBatchesToRetainInMemory" to configure max count of batch to retain in memory.
How was this patch tested?
Apply this patch on top of SPARK-24441 (#21469), and manually tested in various workloads to ensure overall size of states in memory is around 2x or less of the size of latest version of state, while it was 10x ~ 80x before applying the patch.